diff --git a/akka-actor/src/main/scala/akka/io/IO.scala b/akka-actor/src/main/scala/akka/io/IO.scala index e238ffbaf2..653dab0473 100644 --- a/akka-actor/src/main/scala/akka/io/IO.scala +++ b/akka-actor/src/main/scala/akka/io/IO.scala @@ -4,9 +4,11 @@ package akka.io +import scala.util.control.NonFatal import akka.actor._ import akka.routing.RandomRouter import akka.io.SelectionHandler.WorkerForCommand +import akka.event.Logging object IO { @@ -22,6 +24,8 @@ object IO { abstract class SelectorBasedManager(selectorSettings: SelectionHandlerSettings, nrOfSelectors: Int) extends Actor { + override def supervisorStrategy = connectionSupervisorStrategy + val selectorPool = context.actorOf( props = Props(new SelectionHandler(self, selectorSettings)).withRouter(RandomRouter(nrOfSelectors)), name = "selectors") @@ -38,4 +42,18 @@ object IO { } } + /** + * Special supervisor strategy for parents of TCP connection and listener actors. + * Stops the child on all errors and logs DeathPactExceptions only at debug level. + */ + private[io] final val connectionSupervisorStrategy: SupervisorStrategy = + new OneForOneStrategy()(SupervisorStrategy.stoppingStrategy.decider) { + override protected def logFailure(context: ActorContext, child: ActorRef, cause: Throwable, + decision: SupervisorStrategy.Directive): Unit = + if (cause.isInstanceOf[DeathPactException]) { + try context.system.eventStream.publish { + Logging.Debug(child.path.toString, getClass, "Closed after handler termination") + } catch { case NonFatal(_) ⇒ } + } else super.logFailure(context, child, cause, decision) + } } \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/io/TcpConnection.scala b/akka-actor/src/main/scala/akka/io/TcpConnection.scala index f6dc9aaf26..9e5629adbb 100644 --- a/akka-actor/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpConnection.scala @@ -33,7 +33,8 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, // Needed to send the ConnectionClosed message in the postStop handler. var closedMessage: CloseInformation = null - var keepOpenOnPeerClosed: Boolean = false + private[this] var peerClosed = false + private[this] var keepOpenOnPeerClosed = false def writePending = pendingWrite ne null @@ -44,14 +45,17 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, /** connection established, waiting for registration from user handler */ def waitingForRegistration(commander: ActorRef): Receive = { case Register(handler, keepOpenOnPeerClosed) ⇒ + // up to this point we've been watching the commander, + // but since registration is now complete we only need to watch the handler from here on + if (handler != commander) { + context.unwatch(commander) + context.watch(handler) + } if (TraceLogging) log.debug("[{}] registered as connection handler", handler) this.keepOpenOnPeerClosed = keepOpenOnPeerClosed doRead(handler, None) // immediately try reading - context.setReceiveTimeout(Duration.Undefined) - context.watch(handler) // sign death pact - context.become(connected(handler)) case cmd: CloseCommand ⇒ @@ -208,7 +212,7 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, // report that peer closed the connection handler ! PeerClosed // used to check if peer already closed its side later - channel.socket().shutdownInput() + peerClosed = true context.become(peerSentEOF(handler)) case _ if writePending ⇒ // finish writing first if (TraceLogging) log.debug("Got Close command but write is still pending.") @@ -217,7 +221,7 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, if (TraceLogging) log.debug("Got ConfirmedClose command, sending FIN.") channel.socket.shutdownOutput() - if (channel.socket().isInputShutdown) // if peer closed first, the socket is now fully closed + if (peerClosed) // if peer closed first, the socket is now fully closed doCloseConnection(handler, closeCommander, closedEvent) else context.become(closing(handler, closeCommander)) case _ ⇒ // close now diff --git a/akka-actor/src/main/scala/akka/io/TcpListener.scala b/akka-actor/src/main/scala/akka/io/TcpListener.scala index d8d93d214d..30baaa4c06 100644 --- a/akka-actor/src/main/scala/akka/io/TcpListener.scala +++ b/akka-actor/src/main/scala/akka/io/TcpListener.scala @@ -32,8 +32,6 @@ private[io] class TcpListener(val selectorRouter: ActorRef, val tcp: TcpExt, val bindCommander: ActorRef, val bind: Bind) extends Actor with ActorLogging { - - def selector: ActorRef = context.parent import TcpListener._ import tcp.Settings._ import bind._ @@ -56,6 +54,8 @@ private[io] class TcpListener(val selectorRouter: ActorRef, context.parent ! RegisterChannel(channel, SelectionKey.OP_ACCEPT) log.debug("Successfully bound to {}", endpoint) + override def supervisorStrategy = IO.connectionSupervisorStrategy + def receive: Receive = { case ChannelRegistered ⇒ bindCommander ! Bound diff --git a/akka-actor/src/main/scala/akka/io/UdpConnectedManager.scala b/akka-actor/src/main/scala/akka/io/UdpConnectedManager.scala index 284d1b9679..70ef39d5e3 100644 --- a/akka-actor/src/main/scala/akka/io/UdpConnectedManager.scala +++ b/akka-actor/src/main/scala/akka/io/UdpConnectedManager.scala @@ -15,7 +15,7 @@ private[io] class UdpConnectedManager(udpConn: UdpConnectedExt) extends Selector def receive = workerForCommandHandler { case c: Connect ⇒ val commander = sender - Props(new UdpConnectedection(udpConn, commander, c)) + Props(new UdpConnection(udpConn, commander, c)) } } diff --git a/akka-actor/src/main/scala/akka/io/UdpConnection.scala b/akka-actor/src/main/scala/akka/io/UdpConnection.scala index 8e7869a53c..6beded47c8 100644 --- a/akka-actor/src/main/scala/akka/io/UdpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/UdpConnection.scala @@ -16,9 +16,9 @@ import scala.util.control.NonFatal /** * INTERNAL API */ -private[io] class UdpConnectedection(val udpConn: UdpConnectedExt, - val commander: ActorRef, - val connect: Connect) extends Actor with ActorLogging { +private[io] class UdpConnection(val udpConn: UdpConnectedExt, + val commander: ActorRef, + val connect: Connect) extends Actor with ActorLogging { def selector: ActorRef = context.parent diff --git a/akka-actor/src/main/scala/akka/io/UdpSender.scala b/akka-actor/src/main/scala/akka/io/UdpSender.scala index af3df740c5..5f45550591 100644 --- a/akka-actor/src/main/scala/akka/io/UdpSender.scala +++ b/akka-actor/src/main/scala/akka/io/UdpSender.scala @@ -12,8 +12,6 @@ import akka.io.Inet.SocketOption import scala.util.control.NonFatal /** - * Base class for TcpIncomingConnection and TcpOutgoingConnection. - * * INTERNAL API */ private[io] class UdpSender(val udp: UdpExt, options: immutable.Traversable[SocketOption], val commander: ActorRef) @@ -45,9 +43,5 @@ private[io] class UdpSender(val udp: UdpExt, options: immutable.Traversable[Sock case NonFatal(e) ⇒ log.error(e, "Error closing DatagramChannel") } } - - override def postRestart(reason: Throwable): Unit = - throw new IllegalStateException("Restarting not supported for connection actors.") - }