diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 98a7c75329..3399f68639 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -37,8 +37,16 @@ akka { } remote { + + # Which implementation of akka.remote.RemoteSupport to use + # default is a TCP-based remote transport based on Netty transport = "akka.remote.netty.NettyRemoteSupport" + # In case of increased latency / overflow how long + # should we wait (blocking the sender) until we deem the send to be cancelled? + # 0 means "never backoff", any positive number will indicate time to block at most. + backoff-timeout = 0ms + use-compression = off # Generate your own with '$AKKA_HOME/scripts/generate_config_with_secure_cookie.sh' diff --git a/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala b/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala index 67f5b6f8c9..c8ce919944 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala @@ -23,6 +23,7 @@ class RemoteSettings(val config: Config, val systemName: String) extends Extensi val RemoteSystemDaemonAckTimeout = Duration(config.getMilliseconds("akka.remote.remote-daemon-ack-timeout"), MILLISECONDS) val InitalDelayForGossip = Duration(config.getMilliseconds("akka.remote.gossip.initialDelay"), MILLISECONDS) val GossipFrequency = Duration(config.getMilliseconds("akka.remote.gossip.frequency"), MILLISECONDS) + val BackoffTimeout = Duration(config.getMilliseconds("akka.remote.backoff-timeout"), MILLISECONDS) // TODO cluster config will go into akka-cluster/reference.conf when we enable that module val ClusterName = getString("akka.cluster.name") diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 3832d0b8fa..77b805df75 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -18,14 +18,14 @@ import org.jboss.netty.handler.timeout.{ ReadTimeoutHandler, ReadTimeoutExceptio import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer } import scala.collection.mutable.HashMap import java.net.InetSocketAddress -import java.util.concurrent._ import java.util.concurrent.atomic._ import akka.AkkaException import akka.event.Logging -import locks.ReentrantReadWriteLock import org.jboss.netty.channel._ import akka.actor.ActorSystemImpl import org.jboss.netty.handler.execution.{ ExecutionHandler, OrderedMemoryAwareThreadPoolExecutor } +import java.util.concurrent._ +import locks.ReentrantReadWriteLock class RemoteClientMessageBufferException(message: String, cause: Throwable = null) extends AkkaException(message, cause) { def this(msg: String) = this(msg, null) @@ -73,7 +73,9 @@ abstract class RemoteClient private[akka] ( */ private def send(request: (Any, Option[ActorRef], ActorRef)): Unit = { try { - currentChannel.write(request).addListener( + val channel = currentChannel + val f = channel.write(request) + f.addListener( new ChannelFutureListener { def operationComplete(future: ChannelFuture) { if (future.isCancelled) { @@ -83,6 +85,12 @@ abstract class RemoteClient private[akka] ( } } }) + // Check if we should back off + if (!channel.isWritable) { + val backoff = remoteSupport.remote.remoteSettings.BackoffTimeout + if (backoff.length > 0 && !f.await(backoff.length, backoff.unit)) + f.setFailure(new TimeoutException("akka.remote.backoff-timeout occurred, overpressure on RemoteTransport or nonresponsive server")) + } } catch { case e: Exception ⇒ remoteSupport.notifyListeners(RemoteClientError(e, remoteSupport, remoteAddress)) } diff --git a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala index 1132d1a733..cd8c3c8eb5 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala @@ -15,6 +15,7 @@ class RemoteConfigSpec extends AkkaSpec("akka.cluster.nodename = node1") { getString("akka.remote.transport") must equal("akka.remote.netty.NettyRemoteSupport") getString("akka.remote.secure-cookie") must equal("") getBoolean("akka.remote.use-passive-connections") must equal(true) + getMilliseconds("akka.remote.backoff-timeout") must equal(0) // getMilliseconds("akka.remote.remote-daemon-ack-timeout") must equal(30 * 1000) //akka.remote.server