Merge pull request #1328 from spray/master

Tone down error logging by explicitly handling handler termination in TcpConnection
This commit is contained in:
Roland Kuhn 2013-04-12 03:43:21 -07:00
commit d5a658f433
6 changed files with 34 additions and 18 deletions

View file

@ -4,9 +4,11 @@
package akka.io package akka.io
import scala.util.control.NonFatal
import akka.actor._ import akka.actor._
import akka.routing.RandomRouter import akka.routing.RandomRouter
import akka.io.SelectionHandler.WorkerForCommand import akka.io.SelectionHandler.WorkerForCommand
import akka.event.Logging
object IO { object IO {
@ -22,6 +24,8 @@ object IO {
abstract class SelectorBasedManager(selectorSettings: SelectionHandlerSettings, nrOfSelectors: Int) extends Actor { abstract class SelectorBasedManager(selectorSettings: SelectionHandlerSettings, nrOfSelectors: Int) extends Actor {
override def supervisorStrategy = connectionSupervisorStrategy
val selectorPool = context.actorOf( val selectorPool = context.actorOf(
props = Props(new SelectionHandler(self, selectorSettings)).withRouter(RandomRouter(nrOfSelectors)), props = Props(new SelectionHandler(self, selectorSettings)).withRouter(RandomRouter(nrOfSelectors)),
name = "selectors") name = "selectors")
@ -38,4 +42,18 @@ object IO {
} }
} }
/**
* Special supervisor strategy for parents of TCP connection and listener actors.
* Stops the child on all errors and logs DeathPactExceptions only at debug level.
*/
private[io] final val connectionSupervisorStrategy: SupervisorStrategy =
new OneForOneStrategy()(SupervisorStrategy.stoppingStrategy.decider) {
override protected def logFailure(context: ActorContext, child: ActorRef, cause: Throwable,
decision: SupervisorStrategy.Directive): Unit =
if (cause.isInstanceOf[DeathPactException]) {
try context.system.eventStream.publish {
Logging.Debug(child.path.toString, getClass, "Closed after handler termination")
} catch { case NonFatal(_) }
} else super.logFailure(context, child, cause, decision)
}
} }

View file

@ -33,7 +33,8 @@ private[io] abstract class TcpConnection(val channel: SocketChannel,
// Needed to send the ConnectionClosed message in the postStop handler. // Needed to send the ConnectionClosed message in the postStop handler.
var closedMessage: CloseInformation = null var closedMessage: CloseInformation = null
var keepOpenOnPeerClosed: Boolean = false private[this] var peerClosed = false
private[this] var keepOpenOnPeerClosed = false
def writePending = pendingWrite ne null def writePending = pendingWrite ne null
@ -44,14 +45,17 @@ private[io] abstract class TcpConnection(val channel: SocketChannel,
/** connection established, waiting for registration from user handler */ /** connection established, waiting for registration from user handler */
def waitingForRegistration(commander: ActorRef): Receive = { def waitingForRegistration(commander: ActorRef): Receive = {
case Register(handler, keepOpenOnPeerClosed) case Register(handler, keepOpenOnPeerClosed)
// up to this point we've been watching the commander,
// but since registration is now complete we only need to watch the handler from here on
if (handler != commander) {
context.unwatch(commander)
context.watch(handler)
}
if (TraceLogging) log.debug("[{}] registered as connection handler", handler) if (TraceLogging) log.debug("[{}] registered as connection handler", handler)
this.keepOpenOnPeerClosed = keepOpenOnPeerClosed this.keepOpenOnPeerClosed = keepOpenOnPeerClosed
doRead(handler, None) // immediately try reading doRead(handler, None) // immediately try reading
context.setReceiveTimeout(Duration.Undefined) context.setReceiveTimeout(Duration.Undefined)
context.watch(handler) // sign death pact
context.become(connected(handler)) context.become(connected(handler))
case cmd: CloseCommand case cmd: CloseCommand
@ -208,7 +212,7 @@ private[io] abstract class TcpConnection(val channel: SocketChannel,
// report that peer closed the connection // report that peer closed the connection
handler ! PeerClosed handler ! PeerClosed
// used to check if peer already closed its side later // used to check if peer already closed its side later
channel.socket().shutdownInput() peerClosed = true
context.become(peerSentEOF(handler)) context.become(peerSentEOF(handler))
case _ if writePending // finish writing first case _ if writePending // finish writing first
if (TraceLogging) log.debug("Got Close command but write is still pending.") if (TraceLogging) log.debug("Got Close command but write is still pending.")
@ -217,7 +221,7 @@ private[io] abstract class TcpConnection(val channel: SocketChannel,
if (TraceLogging) log.debug("Got ConfirmedClose command, sending FIN.") if (TraceLogging) log.debug("Got ConfirmedClose command, sending FIN.")
channel.socket.shutdownOutput() channel.socket.shutdownOutput()
if (channel.socket().isInputShutdown) // if peer closed first, the socket is now fully closed if (peerClosed) // if peer closed first, the socket is now fully closed
doCloseConnection(handler, closeCommander, closedEvent) doCloseConnection(handler, closeCommander, closedEvent)
else context.become(closing(handler, closeCommander)) else context.become(closing(handler, closeCommander))
case _ // close now case _ // close now

View file

@ -32,8 +32,6 @@ private[io] class TcpListener(val selectorRouter: ActorRef,
val tcp: TcpExt, val tcp: TcpExt,
val bindCommander: ActorRef, val bindCommander: ActorRef,
val bind: Bind) extends Actor with ActorLogging { val bind: Bind) extends Actor with ActorLogging {
def selector: ActorRef = context.parent
import TcpListener._ import TcpListener._
import tcp.Settings._ import tcp.Settings._
import bind._ import bind._
@ -56,6 +54,8 @@ private[io] class TcpListener(val selectorRouter: ActorRef,
context.parent ! RegisterChannel(channel, SelectionKey.OP_ACCEPT) context.parent ! RegisterChannel(channel, SelectionKey.OP_ACCEPT)
log.debug("Successfully bound to {}", endpoint) log.debug("Successfully bound to {}", endpoint)
override def supervisorStrategy = IO.connectionSupervisorStrategy
def receive: Receive = { def receive: Receive = {
case ChannelRegistered case ChannelRegistered
bindCommander ! Bound bindCommander ! Bound

View file

@ -15,7 +15,7 @@ private[io] class UdpConnectedManager(udpConn: UdpConnectedExt) extends Selector
def receive = workerForCommandHandler { def receive = workerForCommandHandler {
case c: Connect case c: Connect
val commander = sender val commander = sender
Props(new UdpConnectedection(udpConn, commander, c)) Props(new UdpConnection(udpConn, commander, c))
} }
} }

View file

@ -16,7 +16,7 @@ import scala.util.control.NonFatal
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[io] class UdpConnectedection(val udpConn: UdpConnectedExt, private[io] class UdpConnection(val udpConn: UdpConnectedExt,
val commander: ActorRef, val commander: ActorRef,
val connect: Connect) extends Actor with ActorLogging { val connect: Connect) extends Actor with ActorLogging {

View file

@ -12,8 +12,6 @@ import akka.io.Inet.SocketOption
import scala.util.control.NonFatal import scala.util.control.NonFatal
/** /**
* Base class for TcpIncomingConnection and TcpOutgoingConnection.
*
* INTERNAL API * INTERNAL API
*/ */
private[io] class UdpSender(val udp: UdpExt, options: immutable.Traversable[SocketOption], val commander: ActorRef) private[io] class UdpSender(val udp: UdpExt, options: immutable.Traversable[SocketOption], val commander: ActorRef)
@ -45,9 +43,5 @@ private[io] class UdpSender(val udp: UdpExt, options: immutable.Traversable[Sock
case NonFatal(e) log.error(e, "Error closing DatagramChannel") case NonFatal(e) log.error(e, "Error closing DatagramChannel")
} }
} }
override def postRestart(reason: Throwable): Unit =
throw new IllegalStateException("Restarting not supported for connection actors.")
} }