diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala index 843593409c..782405832a 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -527,12 +527,10 @@ class TcpConnectionSpec extends AkkaSpec(""" run { localServerChannel.accept() - EventFilter.warning(pattern = "registration timeout", occurrences = 1) intercept { - selector.send(connectionActor, ChannelConnectable) - userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress])) + selector.send(connectionActor, ChannelConnectable) + userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress])) - verifyActorTermination(connectionActor) - } + verifyActorTermination(connectionActor) } } diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala index 52fa79688a..406707ae2b 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala @@ -4,6 +4,7 @@ package akka.io +import scala.concurrent.duration._ import akka.testkit.AkkaSpec import akka.util.ByteString import Tcp._ @@ -32,9 +33,7 @@ class TcpIntegrationSpec extends AkkaSpec(""" "properly handle connection abort from one side" in new TestSetup { val (clientHandler, clientConnection, serverHandler, serverConnection) = establishNewClientConnection() - EventFilter[IOException](occurrences = 1) intercept { - clientHandler.send(clientConnection, Abort) - } + clientHandler.send(clientConnection, Abort) clientHandler.expectMsg(Aborted) serverHandler.expectMsgType[ErrorClosed] verifyActorTermination(clientConnection) diff --git a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala index a6e3febbff..99d54af15f 100644 --- a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala @@ -145,21 +145,28 @@ object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits { /** * When supervisorStrategy is not specified for an actor this - * is used by default. The child will be stopped when - * [[akka.actor.ActorInitializationException]] or [[akka.ActorKilledException]] - * is thrown. It will be restarted for other `Exception` types. - * The error is escalated if it's a `Throwable`, i.e. `Error`. + * is used by default. OneForOneStrategy with decider defined in + * [[#defaultDecider]]. */ final val defaultStrategy: SupervisorStrategy = { - def defaultDecider: Decider = { - case _: ActorInitializationException ⇒ Stop - case _: ActorKilledException ⇒ Stop - case _: DeathPactException ⇒ Stop - case _: Exception ⇒ Restart - } OneForOneStrategy()(defaultDecider) } + /** + * When supervisorStrategy is not specified for an actor this + * [[Decider]] is used by default in the supervisor strategy. + * The child will be stopped when [[akka.actor.ActorInitializationException]], + * [[akka.ActorKilledException]], or [[akka.actor.DeathPactException]] is + * thrown. It will be restarted for other `Exception` types. + * The error is escalated if it's a `Throwable`, i.e. `Error`. + */ + final def defaultDecider: Decider = { + case _: ActorInitializationException ⇒ Stop + case _: ActorKilledException ⇒ Stop + case _: DeathPactException ⇒ Stop + case _: Exception ⇒ Restart + } + /** * This strategy resembles Erlang in that failing children are always * terminated (one-for-one). diff --git a/akka-actor/src/main/scala/akka/io/SelectionHandler.scala b/akka-actor/src/main/scala/akka/io/SelectionHandler.scala index b3a2486163..ceaeadf98c 100644 --- a/akka-actor/src/main/scala/akka/io/SelectionHandler.scala +++ b/akka-actor/src/main/scala/akka/io/SelectionHandler.scala @@ -168,7 +168,7 @@ private[io] object SelectionHandler { def tryRun(): Unit = { // thorough 'close' of the Selector @tailrec def closeNextChannel(it: JIterator[SelectionKey]): Unit = if (it.hasNext) { - try it.next().channel.close() catch { case NonFatal(e) ⇒ log.error(e, "Error closing channel") } + try it.next().channel.close() catch { case NonFatal(e) ⇒ log.debug("Error closing channel: {}", e) } closeNextChannel(it) } try closeNextChannel(selector.keys.iterator) @@ -246,7 +246,24 @@ private[io] class SelectionHandler(settings: SelectionHandlerSettings) extends A override def postStop(): Unit = registry.shutdown() // we can never recover from failures of a connection or listener child - override def supervisorStrategy = SupervisorStrategy.stoppingStrategy + // and log the failure at debug level + override def supervisorStrategy = { + def stoppingDecider: SupervisorStrategy.Decider = { + case _: Exception ⇒ SupervisorStrategy.Stop + } + new OneForOneStrategy()(stoppingDecider) { + override protected def logFailure(context: ActorContext, child: ActorRef, cause: Throwable, + decision: SupervisorStrategy.Directive): Unit = + try { + val logMessage = cause match { + case e: ActorInitializationException if e.getCause ne null ⇒ e.getCause.getMessage + case e ⇒ e.getMessage + } + context.system.eventStream.publish( + Logging.Debug(child.path.toString, classOf[SelectionHandler], logMessage)) + } catch { case NonFatal(_) ⇒ } + } + } def spawnChildWithCapacityProtection(cmd: WorkerForCommand, retriesLeft: Int): Unit = { if (TraceLogging) log.debug("Executing [{}]", cmd) @@ -258,7 +275,7 @@ private[io] class SelectionHandler(settings: SelectionHandlerSettings) extends A if (MaxChannelsPerSelector > 0) context.watch(child) // we don't need to watch if we aren't limited } else { if (retriesLeft >= 1) { - log.warning("Rejecting [{}] with [{}] retries left, retrying...", cmd, retriesLeft) + log.debug("Rejecting [{}] with [{}] retries left, retrying...", cmd, retriesLeft) context.parent forward Retry(cmd, retriesLeft - 1) } else { log.warning("Rejecting [{}] with no retries left, aborting...", cmd) diff --git a/akka-actor/src/main/scala/akka/io/TcpConnection.scala b/akka-actor/src/main/scala/akka/io/TcpConnection.scala index ec8b893761..956b500c2c 100644 --- a/akka-actor/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpConnection.scala @@ -65,7 +65,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha case ReceiveTimeout ⇒ // after sending `Register` user should watch this actor to make sure // it didn't die because of the timeout - log.warning("Configured registration timeout of [{}] expired, stopping", RegisterTimeout) + log.debug("Configured registration timeout of [{}] expired, stopping", RegisterTimeout) context.stop(self) } @@ -141,10 +141,10 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha case ResumeWriting ⇒ /* - * If more than one actor sends Writes then the first to send this + * If more than one actor sends Writes then the first to send this * message might resume too early for the second, leading to a Write of * the second to go through although it has not been resumed yet; there - * is nothing we can do about this apart from all actors needing to + * is nothing we can do about this apart from all actors needing to * register themselves and us keeping track of them, which sounds bad. * * Thus it is documented that useResumeWriting is incompatible with diff --git a/akka-actor/src/main/scala/akka/io/TcpListener.scala b/akka-actor/src/main/scala/akka/io/TcpListener.scala index 6bd93baa60..040e09ef88 100644 --- a/akka-actor/src/main/scala/akka/io/TcpListener.scala +++ b/akka-actor/src/main/scala/akka/io/TcpListener.scala @@ -59,7 +59,7 @@ private[io] class TcpListener(selectorRouter: ActorRef, } catch { case NonFatal(e) ⇒ bindCommander ! bind.failureMessage - log.error(e, "Bind failed for TCP channel on endpoint [{}]", bind.localAddress) + log.debug("Bind failed for TCP channel on endpoint [{}]: {}", bind.localAddress, e) context.stop(self) } @@ -79,7 +79,7 @@ private[io] class TcpListener(selectorRouter: ActorRef, log.warning("Could not register incoming connection since selector capacity limit is reached, closing connection") try socketChannel.close() catch { - case NonFatal(e) ⇒ log.error(e, "Error closing channel") + case NonFatal(e) ⇒ log.debug("Error closing socket channel: {}", e) } case Unbind ⇒ @@ -95,7 +95,7 @@ private[io] class TcpListener(selectorRouter: ActorRef, if (limit > 0) { try channel.accept() catch { - case NonFatal(e) ⇒ { log.error(e, "Accept error: could not accept new connection due to {}", e); null } + case NonFatal(e) ⇒ { log.error(e, "Accept error: could not accept new connection"); null } } } else null if (socketChannel != null) { @@ -115,7 +115,7 @@ private[io] class TcpListener(selectorRouter: ActorRef, channel.close() } } catch { - case NonFatal(e) ⇒ log.error(e, "Error closing ServerSocketChannel") + case NonFatal(e) ⇒ log.debug("Error closing ServerSocketChannel: {}", e) } } } diff --git a/akka-actor/src/main/scala/akka/io/UdpConnection.scala b/akka-actor/src/main/scala/akka/io/UdpConnection.scala index 3eb6778b25..4e9b1cdf5e 100644 --- a/akka-actor/src/main/scala/akka/io/UdpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/UdpConnection.scala @@ -41,8 +41,8 @@ private[io] class UdpConnection(udpConn: UdpConnectedExt, datagramChannel.connect(remoteAddress) } catch { case NonFatal(e) ⇒ - log.error(e, "Failure while connecting UDP channel to remote address [{}] local address [{}]", - remoteAddress, localAddress.map { _.toString }.getOrElse("undefined")) + log.debug("Failure while connecting UDP channel to remote address [{}] local address [{}]: {}", + remoteAddress, localAddress.getOrElse("undefined"), e) commander ! CommandFailed(connect) context.stop(self) } @@ -126,7 +126,7 @@ private[io] class UdpConnection(udpConn: UdpConnectedExt, log.debug("Closing DatagramChannel after being stopped") try channel.close() catch { - case NonFatal(e) ⇒ log.error(e, "Error closing DatagramChannel") + case NonFatal(e) ⇒ log.debug("Error closing DatagramChannel: {}", e) } } } diff --git a/akka-actor/src/main/scala/akka/io/UdpListener.scala b/akka-actor/src/main/scala/akka/io/UdpListener.scala index b4ab0e2caf..088a48023a 100644 --- a/akka-actor/src/main/scala/akka/io/UdpListener.scala +++ b/akka-actor/src/main/scala/akka/io/UdpListener.scala @@ -49,7 +49,7 @@ private[io] class UdpListener(val udp: UdpExt, } catch { case NonFatal(e) ⇒ bindCommander ! CommandFailed(bind) - log.error(e, "Failed to bind UDP channel to endpoint [{}]", bind.localAddress) + log.debug("Failed to bind UDP channel to endpoint [{}]: {}", bind.localAddress, e) context.stop(self) } @@ -99,7 +99,7 @@ private[io] class UdpListener(val udp: UdpExt, log.debug("Closing DatagramChannel after being stopped") try channel.close() catch { - case NonFatal(e) ⇒ log.error(e, "Error closing DatagramChannel") + case NonFatal(e) ⇒ log.debug("Error closing DatagramChannel: {}", e) } } } diff --git a/akka-actor/src/main/scala/akka/io/UdpSender.scala b/akka-actor/src/main/scala/akka/io/UdpSender.scala index 925c6dbf92..85c773754d 100644 --- a/akka-actor/src/main/scala/akka/io/UdpSender.scala +++ b/akka-actor/src/main/scala/akka/io/UdpSender.scala @@ -41,7 +41,7 @@ private[io] class UdpSender(val udp: UdpExt, log.debug("Closing DatagramChannel after being stopped") try channel.close() catch { - case NonFatal(e) ⇒ log.error(e, "Error closing DatagramChannel") + case NonFatal(e) ⇒ log.debug("Error closing DatagramChannel: {}", e) } } }