io: change IOException treatment from "fatal error" to "expected during normal operation"

Up to now IOExceptions during reading, writing or connecting were treated as
fatal actor errors, crashing the connection actor and thus producing ERROR
level log messages. This patch treats such exceptions as "expected" during
normal operation and prevents them from crashing the actor. Rather, they are
logged at DEBUG level and the actor is actively and cleanly stopped.
This commit is contained in:
Mathias 2013-06-20 10:22:47 +02:00
parent 8afd7a440d
commit b311e9e700
3 changed files with 35 additions and 37 deletions

View file

@ -457,12 +457,11 @@ class TcpConnectionSpec extends AkkaSpec("""
"report when peer aborted the connection" in new EstablishedConnectionTest() {
run {
EventFilter[IOException](occurrences = 1) intercept {
abortClose(serverSideChannel)
selector.send(connectionActor, ChannelReadable)
val err = connectionHandler.expectMsgType[ErrorClosed]
err.cause must be(ConnectionResetByPeerMessage)
}
abortClose(serverSideChannel)
selector.send(connectionActor, ChannelReadable)
val err = connectionHandler.expectMsgType[ErrorClosed]
err.cause must be(ConnectionResetByPeerMessage)
// wait a while
connectionHandler.expectNoMsg(200.millis)
@ -475,11 +474,9 @@ class TcpConnectionSpec extends AkkaSpec("""
val writer = TestProbe()
abortClose(serverSideChannel)
EventFilter[IOException](occurrences = 1) intercept {
writer.send(connectionActor, Write(ByteString("testdata")))
// bother writer and handler should get the message
writer.expectMsgType[ErrorClosed]
}
writer.send(connectionActor, Write(ByteString("testdata")))
// bother writer and handler should get the message
writer.expectMsgType[ErrorClosed]
connectionHandler.expectMsgType[ErrorClosed]
assertThisConnectionActorTerminated()
@ -501,10 +498,8 @@ class TcpConnectionSpec extends AkkaSpec("""
key.isConnectable must be(true)
val forceThisLazyVal = connectionActor.toString
Thread.sleep(300)
EventFilter[ConnectException](occurrences = 1) intercept {
selector.send(connectionActor, ChannelConnectable)
userHandler.expectMsg(CommandFailed(Connect(UnboundAddress)))
}
selector.send(connectionActor, ChannelConnectable)
userHandler.expectMsg(CommandFailed(Connect(UnboundAddress)))
verifyActorTermination(connectionActor)
} finally sel.close()
@ -516,9 +511,7 @@ class TcpConnectionSpec extends AkkaSpec("""
override lazy val connectionActor = createConnectionActor(serverAddress = UnboundAddress, timeout = Option(100.millis))
run {
connectionActor.toString must not be ("")
EventFilter[SocketTimeoutException](occurrences = 1) intercept {
userHandler.expectMsg(CommandFailed(Connect(UnboundAddress, timeout = Option(100.millis))))
}
userHandler.expectMsg(CommandFailed(Connect(UnboundAddress, timeout = Option(100.millis))))
verifyActorTermination(connectionActor)
}
}

View file

@ -250,13 +250,12 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
def doCloseConnection(handler: ActorRef, closeCommander: Option[ActorRef], closedEvent: ConnectionClosed): Unit = {
if (closedEvent == Aborted) abort()
else channel.close()
closedMessage = CloseInformation(Set(handler) ++ closeCommander, closedEvent)
context.stop(self)
stopWith(CloseInformation(Set(handler) ++ closeCommander, closedEvent))
}
def handleError(handler: ActorRef, exception: IOException): Nothing = {
closedMessage = CloseInformation(Set(handler), ErrorClosed(extractMsg(exception)))
throw exception
def handleError(handler: ActorRef, exception: IOException): Unit = {
log.debug("Closing connection due to IO error {}", exception)
stopWith(CloseInformation(Set(handler), ErrorClosed(extractMsg(exception))))
}
@tailrec private[this] def extractMsg(t: Throwable): String =
@ -279,6 +278,11 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
channel.close()
}
def stopWith(closeInfo: CloseInformation): Unit = {
closedMessage = closeInfo
context.stop(self)
}
override def postStop(): Unit = {
if (channel.isOpen)
abort()
@ -349,7 +353,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
}
try innerWrite(this)
catch { case e: IOException handleError(info.handler, e) }
catch { case e: IOException handleError(info.handler, e); this }
}
def hasData = buffer.hasRemaining || remainingData.nonEmpty
def consume(writtenBytes: Int): PendingBufferWrite =

View file

@ -4,15 +4,16 @@
package akka.io
import akka.actor.{ ReceiveTimeout, ActorRef }
import akka.io.Inet.SocketOption
import akka.io.SelectionHandler._
import akka.io.Tcp._
import java.io.IOException
import java.nio.channels.{ SelectionKey, SocketChannel }
import java.net.ConnectException
import scala.collection.immutable
import scala.concurrent.duration.Duration
import java.net.{ ConnectException, SocketTimeoutException }
import akka.actor.{ ReceiveTimeout, ActorRef }
import akka.io.Inet.SocketOption
import akka.io.TcpConnection.CloseInformation
import akka.io.SelectionHandler._
import akka.io.Tcp._
/**
* An actor handling the connection state machine for an outgoing connection
@ -45,7 +46,9 @@ private[io] class TcpOutgoingConnection(_tcp: TcpExt,
}
def connecting(registration: ChannelRegistration, commander: ActorRef,
options: immutable.Traversable[SocketOption]): Receive =
options: immutable.Traversable[SocketOption]): Receive = {
def stop(): Unit = stopWith(CloseInformation(Set(commander), connect.failureMessage))
{
case ChannelConnectable
if (timeout.isDefined) context.setReceiveTimeout(Duration.Undefined) // Clear the timeout
@ -55,16 +58,14 @@ private[io] class TcpOutgoingConnection(_tcp: TcpExt,
completeConnect(registration, commander, options)
} catch {
case e: IOException
if (tcp.Settings.TraceLogging) log.debug("Could not establish connection due to {}", e)
closedMessage = TcpConnection.CloseInformation(Set(commander), connect.failureMessage)
throw e
log.debug("Could not establish connection to [{}] due to {}", remoteAddress, e)
stop()
}
case ReceiveTimeout
if (timeout.isDefined) context.setReceiveTimeout(Duration.Undefined) // Clear the timeout
val failure = new SocketTimeoutException(s"Connection to [$remoteAddress] timed out")
if (tcp.Settings.TraceLogging) log.debug("Could not establish connection due to {}", failure)
closedMessage = TcpConnection.CloseInformation(Set(commander), connect.failureMessage)
throw failure
log.debug("Connect timeout expired, could not establish connection to [{}]", remoteAddress)
stop()
}
}
}