tone down error logging in IO layer, see #3386

* Made defaultDecider available in SupervisorStrategy,
  turned out that I didn't need it but I think it could be
  good anyway, e.g.
  override def supervisorStrategy = OneForOneStrategy(
    enableLogging = false)(SupervisorStrategy.defaultDecider)
* Verified the following scenarios:
  - client connection failure
  - server bind failure
  - kill client (peer closed)
  - kill server (peer closed)
This commit is contained in:
Patrik Nordwall 2013-05-31 09:52:51 +02:00
parent 51ed174432
commit 9b59187816
9 changed files with 55 additions and 34 deletions

View file

@ -527,12 +527,10 @@ class TcpConnectionSpec extends AkkaSpec("""
run { run {
localServerChannel.accept() localServerChannel.accept()
EventFilter.warning(pattern = "registration timeout", occurrences = 1) intercept { selector.send(connectionActor, ChannelConnectable)
selector.send(connectionActor, ChannelConnectable) userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress]))
userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress]))
verifyActorTermination(connectionActor) verifyActorTermination(connectionActor)
}
} }
} }

View file

@ -4,6 +4,7 @@
package akka.io package akka.io
import scala.concurrent.duration._
import akka.testkit.AkkaSpec import akka.testkit.AkkaSpec
import akka.util.ByteString import akka.util.ByteString
import Tcp._ import Tcp._
@ -32,9 +33,7 @@ class TcpIntegrationSpec extends AkkaSpec("""
"properly handle connection abort from one side" in new TestSetup { "properly handle connection abort from one side" in new TestSetup {
val (clientHandler, clientConnection, serverHandler, serverConnection) = establishNewClientConnection() val (clientHandler, clientConnection, serverHandler, serverConnection) = establishNewClientConnection()
EventFilter[IOException](occurrences = 1) intercept { clientHandler.send(clientConnection, Abort)
clientHandler.send(clientConnection, Abort)
}
clientHandler.expectMsg(Aborted) clientHandler.expectMsg(Aborted)
serverHandler.expectMsgType[ErrorClosed] serverHandler.expectMsgType[ErrorClosed]
verifyActorTermination(clientConnection) verifyActorTermination(clientConnection)

View file

@ -145,21 +145,28 @@ object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits {
/** /**
* When supervisorStrategy is not specified for an actor this * When supervisorStrategy is not specified for an actor this
* is used by default. The child will be stopped when * is used by default. OneForOneStrategy with decider defined in
* [[akka.actor.ActorInitializationException]] or [[akka.ActorKilledException]] * [[#defaultDecider]].
* is thrown. It will be restarted for other `Exception` types.
* The error is escalated if it's a `Throwable`, i.e. `Error`.
*/ */
final val defaultStrategy: SupervisorStrategy = { final val defaultStrategy: SupervisorStrategy = {
def defaultDecider: Decider = {
case _: ActorInitializationException Stop
case _: ActorKilledException Stop
case _: DeathPactException Stop
case _: Exception Restart
}
OneForOneStrategy()(defaultDecider) OneForOneStrategy()(defaultDecider)
} }
/**
* When supervisorStrategy is not specified for an actor this
* [[Decider]] is used by default in the supervisor strategy.
* The child will be stopped when [[akka.actor.ActorInitializationException]],
* [[akka.ActorKilledException]], or [[akka.actor.DeathPactException]] is
* thrown. It will be restarted for other `Exception` types.
* The error is escalated if it's a `Throwable`, i.e. `Error`.
*/
final def defaultDecider: Decider = {
case _: ActorInitializationException Stop
case _: ActorKilledException Stop
case _: DeathPactException Stop
case _: Exception Restart
}
/** /**
* This strategy resembles Erlang in that failing children are always * This strategy resembles Erlang in that failing children are always
* terminated (one-for-one). * terminated (one-for-one).

View file

@ -168,7 +168,7 @@ private[io] object SelectionHandler {
def tryRun(): Unit = { def tryRun(): Unit = {
// thorough 'close' of the Selector // thorough 'close' of the Selector
@tailrec def closeNextChannel(it: JIterator[SelectionKey]): Unit = if (it.hasNext) { @tailrec def closeNextChannel(it: JIterator[SelectionKey]): Unit = if (it.hasNext) {
try it.next().channel.close() catch { case NonFatal(e) log.error(e, "Error closing channel") } try it.next().channel.close() catch { case NonFatal(e) log.debug("Error closing channel: {}", e) }
closeNextChannel(it) closeNextChannel(it)
} }
try closeNextChannel(selector.keys.iterator) try closeNextChannel(selector.keys.iterator)
@ -246,7 +246,24 @@ private[io] class SelectionHandler(settings: SelectionHandlerSettings) extends A
override def postStop(): Unit = registry.shutdown() override def postStop(): Unit = registry.shutdown()
// we can never recover from failures of a connection or listener child // we can never recover from failures of a connection or listener child
override def supervisorStrategy = SupervisorStrategy.stoppingStrategy // and log the failure at debug level
override def supervisorStrategy = {
def stoppingDecider: SupervisorStrategy.Decider = {
case _: Exception SupervisorStrategy.Stop
}
new OneForOneStrategy()(stoppingDecider) {
override protected def logFailure(context: ActorContext, child: ActorRef, cause: Throwable,
decision: SupervisorStrategy.Directive): Unit =
try {
val logMessage = cause match {
case e: ActorInitializationException if e.getCause ne null e.getCause.getMessage
case e e.getMessage
}
context.system.eventStream.publish(
Logging.Debug(child.path.toString, classOf[SelectionHandler], logMessage))
} catch { case NonFatal(_) }
}
}
def spawnChildWithCapacityProtection(cmd: WorkerForCommand, retriesLeft: Int): Unit = { def spawnChildWithCapacityProtection(cmd: WorkerForCommand, retriesLeft: Int): Unit = {
if (TraceLogging) log.debug("Executing [{}]", cmd) if (TraceLogging) log.debug("Executing [{}]", cmd)
@ -258,7 +275,7 @@ private[io] class SelectionHandler(settings: SelectionHandlerSettings) extends A
if (MaxChannelsPerSelector > 0) context.watch(child) // we don't need to watch if we aren't limited if (MaxChannelsPerSelector > 0) context.watch(child) // we don't need to watch if we aren't limited
} else { } else {
if (retriesLeft >= 1) { if (retriesLeft >= 1) {
log.warning("Rejecting [{}] with [{}] retries left, retrying...", cmd, retriesLeft) log.debug("Rejecting [{}] with [{}] retries left, retrying...", cmd, retriesLeft)
context.parent forward Retry(cmd, retriesLeft - 1) context.parent forward Retry(cmd, retriesLeft - 1)
} else { } else {
log.warning("Rejecting [{}] with no retries left, aborting...", cmd) log.warning("Rejecting [{}] with no retries left, aborting...", cmd)

View file

@ -65,7 +65,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
case ReceiveTimeout case ReceiveTimeout
// after sending `Register` user should watch this actor to make sure // after sending `Register` user should watch this actor to make sure
// it didn't die because of the timeout // it didn't die because of the timeout
log.warning("Configured registration timeout of [{}] expired, stopping", RegisterTimeout) log.debug("Configured registration timeout of [{}] expired, stopping", RegisterTimeout)
context.stop(self) context.stop(self)
} }
@ -141,10 +141,10 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
case ResumeWriting case ResumeWriting
/* /*
* If more than one actor sends Writes then the first to send this * If more than one actor sends Writes then the first to send this
* message might resume too early for the second, leading to a Write of * message might resume too early for the second, leading to a Write of
* the second to go through although it has not been resumed yet; there * the second to go through although it has not been resumed yet; there
* is nothing we can do about this apart from all actors needing to * is nothing we can do about this apart from all actors needing to
* register themselves and us keeping track of them, which sounds bad. * register themselves and us keeping track of them, which sounds bad.
* *
* Thus it is documented that useResumeWriting is incompatible with * Thus it is documented that useResumeWriting is incompatible with

View file

@ -59,7 +59,7 @@ private[io] class TcpListener(selectorRouter: ActorRef,
} catch { } catch {
case NonFatal(e) case NonFatal(e)
bindCommander ! bind.failureMessage bindCommander ! bind.failureMessage
log.error(e, "Bind failed for TCP channel on endpoint [{}]", bind.localAddress) log.debug("Bind failed for TCP channel on endpoint [{}]: {}", bind.localAddress, e)
context.stop(self) context.stop(self)
} }
@ -79,7 +79,7 @@ private[io] class TcpListener(selectorRouter: ActorRef,
log.warning("Could not register incoming connection since selector capacity limit is reached, closing connection") log.warning("Could not register incoming connection since selector capacity limit is reached, closing connection")
try socketChannel.close() try socketChannel.close()
catch { catch {
case NonFatal(e) log.error(e, "Error closing channel") case NonFatal(e) log.debug("Error closing socket channel: {}", e)
} }
case Unbind case Unbind
@ -95,7 +95,7 @@ private[io] class TcpListener(selectorRouter: ActorRef,
if (limit > 0) { if (limit > 0) {
try channel.accept() try channel.accept()
catch { catch {
case NonFatal(e) { log.error(e, "Accept error: could not accept new connection due to {}", e); null } case NonFatal(e) { log.error(e, "Accept error: could not accept new connection"); null }
} }
} else null } else null
if (socketChannel != null) { if (socketChannel != null) {
@ -115,7 +115,7 @@ private[io] class TcpListener(selectorRouter: ActorRef,
channel.close() channel.close()
} }
} catch { } catch {
case NonFatal(e) log.error(e, "Error closing ServerSocketChannel") case NonFatal(e) log.debug("Error closing ServerSocketChannel: {}", e)
} }
} }
} }

View file

@ -41,8 +41,8 @@ private[io] class UdpConnection(udpConn: UdpConnectedExt,
datagramChannel.connect(remoteAddress) datagramChannel.connect(remoteAddress)
} catch { } catch {
case NonFatal(e) case NonFatal(e)
log.error(e, "Failure while connecting UDP channel to remote address [{}] local address [{}]", log.debug("Failure while connecting UDP channel to remote address [{}] local address [{}]: {}",
remoteAddress, localAddress.map { _.toString }.getOrElse("undefined")) remoteAddress, localAddress.getOrElse("undefined"), e)
commander ! CommandFailed(connect) commander ! CommandFailed(connect)
context.stop(self) context.stop(self)
} }
@ -126,7 +126,7 @@ private[io] class UdpConnection(udpConn: UdpConnectedExt,
log.debug("Closing DatagramChannel after being stopped") log.debug("Closing DatagramChannel after being stopped")
try channel.close() try channel.close()
catch { catch {
case NonFatal(e) log.error(e, "Error closing DatagramChannel") case NonFatal(e) log.debug("Error closing DatagramChannel: {}", e)
} }
} }
} }

View file

@ -49,7 +49,7 @@ private[io] class UdpListener(val udp: UdpExt,
} catch { } catch {
case NonFatal(e) case NonFatal(e)
bindCommander ! CommandFailed(bind) bindCommander ! CommandFailed(bind)
log.error(e, "Failed to bind UDP channel to endpoint [{}]", bind.localAddress) log.debug("Failed to bind UDP channel to endpoint [{}]: {}", bind.localAddress, e)
context.stop(self) context.stop(self)
} }
@ -99,7 +99,7 @@ private[io] class UdpListener(val udp: UdpExt,
log.debug("Closing DatagramChannel after being stopped") log.debug("Closing DatagramChannel after being stopped")
try channel.close() try channel.close()
catch { catch {
case NonFatal(e) log.error(e, "Error closing DatagramChannel") case NonFatal(e) log.debug("Error closing DatagramChannel: {}", e)
} }
} }
} }

View file

@ -41,7 +41,7 @@ private[io] class UdpSender(val udp: UdpExt,
log.debug("Closing DatagramChannel after being stopped") log.debug("Closing DatagramChannel after being stopped")
try channel.close() try channel.close()
catch { catch {
case NonFatal(e) log.error(e, "Error closing DatagramChannel") case NonFatal(e) log.debug("Error closing DatagramChannel: {}", e)
} }
} }
} }