diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala index 06e3caeac8..a5d2ceb58d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala @@ -55,7 +55,6 @@ object LargeClusterMultiJvmSpec extends MultiNodeConfig { akka.remote.log-remote-lifecycle-events = off akka.remote.netty.execution-pool-size = 4 #akka.remote.netty.reconnection-time-window = 10s - akka.remote.netty.read-timeout = 5s akka.remote.netty.write-timeout = 5s akka.remote.netty.backoff-timeout = 500ms akka.remote.netty.connection-timeout = 500ms diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index cc8818a3be..f0f78f3ec3 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -171,10 +171,12 @@ akka { # (O) Time between reconnect attempts for active clients reconnect-delay = 5s - # (O) Read inactivity period (lowest resolution is seconds) + # (O) Client read inactivity period (finest resolution is seconds) # after which active client connection is shutdown; - # will be re-established in case of new communication requests. + # Connection will be re-established in case of new communication requests. # A value of 0 will turn this feature off + # This value should be left to be 0 when use-passive-connections is off, or if + # no traffic is expected from the server side (i.e. it is a sink). read-timeout = 0s # (O) Write inactivity period (lowest resolution is seconds) diff --git a/akka-remote/src/main/scala/akka/remote/netty/Client.scala b/akka-remote/src/main/scala/akka/remote/netty/Client.scala index 2015f82ac7..7568f859ec 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Client.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Client.scala @@ -21,6 +21,7 @@ import akka.util.Switch import scala.util.control.NonFatal import org.jboss.netty.handler.ssl.SslHandler import scala.concurrent.duration._ +import java.nio.channels.ClosedChannelException /** * This is the abstract baseclass for netty remote clients, currently there's only an @@ -63,21 +64,23 @@ private[akka] abstract class RemoteClient private[akka] (val netty: NettyRemoteT private def send(request: (Any, Option[ActorRef], ActorRef)): Unit = { try { val channel = currentChannel - val f = channel.write(request) - f.addListener( - new ChannelFutureListener { - import netty.system.deadLetters - def operationComplete(future: ChannelFuture): Unit = - if (future.isCancelled || !future.isSuccess) request match { - case (msg, sender, recipient) ⇒ deadLetters ! DeadLetter(msg, sender.getOrElse(deadLetters), recipient) - // We don't call notifyListeners here since we don't think failed message deliveries are errors - /// If the connection goes down we'll get the error reporting done by the pipeline. - } - }) - // Check if we should back off - if (!channel.isWritable) { - val backoff = netty.settings.BackoffTimeout - if (backoff.length > 0 && !f.await(backoff.length, backoff.unit)) f.cancel() //Waited as long as we could, now back off + if (channel.isOpen) { + val f = channel.write(request) + f.addListener( + new ChannelFutureListener { + import netty.system.deadLetters + def operationComplete(future: ChannelFuture): Unit = + if (future.isCancelled || !future.isSuccess) request match { + case (msg, sender, recipient) ⇒ deadLetters ! DeadLetter(msg, sender.getOrElse(deadLetters), recipient) + // We don't call notifyListeners here since we don't think failed message deliveries are errors + /// If the connection goes down we'll get the error reporting done by the pipeline. + } + }) + // Check if we should back off + if (!channel.isWritable) { + val backoff = netty.settings.BackoffTimeout + if (backoff.length > 0 && !f.await(backoff.length, backoff.unit)) f.cancel() //Waited as long as we could, now back off + } } } catch { case NonFatal(e) ⇒ netty.notifyListeners(RemoteClientError(e, netty, remoteAddress)) @@ -195,8 +198,11 @@ private[akka] class ActiveRemoteClient private[akka] ( notifyListeners(RemoteClientShutdown(netty, remoteAddress)) try { if ((connection ne null) && (connection.getChannel ne null)) { - ChannelAddress.remove(connection.getChannel) - connection.getChannel.close() + val channel = connection.getChannel + ChannelAddress.remove(channel) + // Try to disconnect first to reduce "connection reset by peer" events + if (channel.isConnected) channel.disconnect() + if (channel.isOpen) channel.close() } } finally { try { @@ -267,10 +273,8 @@ private[akka] class ActiveRemoteClientHandler( case CommandType.SHUTDOWN ⇒ runOnceNow { client.netty.shutdownClientConnection(remoteAddress) } case _ ⇒ //Ignore others } - case arp: AkkaRemoteProtocol if arp.hasMessage ⇒ client.netty.receiveMessage(new RemoteMessage(arp.getMessage, client.netty.system)) - case other ⇒ throw new RemoteClientException("Unknown message received in remote client handler: " + other, client.netty, client.remoteAddress) } @@ -307,9 +311,14 @@ private[akka] class ActiveRemoteClientHandler( } override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { - val cause = if (event.getCause ne null) event.getCause else new Exception("Unknown cause") - client.notifyListeners(RemoteClientError(cause, client.netty, client.remoteAddress)) - event.getChannel.close() + val cause = if (event.getCause ne null) event.getCause else new AkkaException("Unknown cause") + cause match { + case _: ClosedChannelException ⇒ // Ignore + case NonFatal(e) ⇒ + client.notifyListeners(RemoteClientError(e, client.netty, client.remoteAddress)) + event.getChannel.close() + case e: Throwable ⇒ throw e // Rethrow fatals + } } } diff --git a/akka-remote/src/main/scala/akka/remote/netty/Server.scala b/akka-remote/src/main/scala/akka/remote/netty/Server.scala index 1f083bcab5..9f0eb14e1c 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Server.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Server.scala @@ -3,21 +3,19 @@ */ package akka.remote.netty +import akka.actor.Address +import akka.remote.RemoteProtocol.{ RemoteControlProtocol, CommandType, AkkaRemoteProtocol } +import akka.remote._ +import java.net.InetAddress import java.net.InetSocketAddress +import java.nio.channels.ClosedChannelException import java.util.concurrent.Executors -import scala.Option.option2Iterable import org.jboss.netty.bootstrap.ServerBootstrap -import org.jboss.netty.channel.ChannelHandler.Sharable +import org.jboss.netty.channel._ import org.jboss.netty.channel.group.ChannelGroup import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory -import org.jboss.netty.handler.codec.frame.{ LengthFieldPrepender, LengthFieldBasedFrameDecoder } -import org.jboss.netty.handler.execution.ExecutionHandler -import akka.remote.RemoteProtocol.{ RemoteControlProtocol, CommandType, AkkaRemoteProtocol } -import akka.remote.{ RemoteServerShutdown, RemoteServerError, RemoteServerClientDisconnected, RemoteServerClientConnected, RemoteServerClientClosed, RemoteProtocol, RemoteMessage } -import akka.actor.Address -import java.net.InetAddress -import akka.actor.ActorSystemImpl -import org.jboss.netty.channel._ +import scala.util.control.NonFatal +import akka.AkkaException private[akka] class NettyRemoteServer(val netty: NettyRemoteTransport) { @@ -155,7 +153,6 @@ private[akka] class RemoteServerHandler( event.getMessage match { case remote: AkkaRemoteProtocol if remote.hasMessage ⇒ netty.receiveMessage(new RemoteMessage(remote.getMessage, netty.system)) - case remote: AkkaRemoteProtocol if remote.hasInstruction ⇒ val instruction = remote.getInstruction instruction.getCommandType match { @@ -180,8 +177,14 @@ private[akka] class RemoteServerHandler( } override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { - netty.notifyListeners(RemoteServerError(event.getCause, netty)) - event.getChannel.close() + val cause = if (event.getCause ne null) event.getCause else new AkkaException("Unknown cause") + cause match { + case _: ClosedChannelException ⇒ // Ignore + case NonFatal(e) ⇒ + netty.notifyListeners(RemoteServerError(e, netty)) + event.getChannel.close() + case e: Throwable ⇒ throw e // Rethrow fatals + } } }