Adding backoff-timeout to remote configuration

This commit is contained in:
Viktor Klang 2011-12-30 17:24:23 +01:00
parent 7c02287b66
commit dfcd1571fd
4 changed files with 21 additions and 3 deletions

View file

@ -37,8 +37,16 @@ akka {
} }
remote { remote {
# Which implementation of akka.remote.RemoteSupport to use
# default is a TCP-based remote transport based on Netty
transport = "akka.remote.netty.NettyRemoteSupport" transport = "akka.remote.netty.NettyRemoteSupport"
# In case of increased latency / overflow how long
# should we wait (blocking the sender) until we deem the send to be cancelled?
# 0 means "never backoff", any positive number will indicate time to block at most.
backoff-timeout = 0ms
use-compression = off use-compression = off
# Generate your own with '$AKKA_HOME/scripts/generate_config_with_secure_cookie.sh' # Generate your own with '$AKKA_HOME/scripts/generate_config_with_secure_cookie.sh'

View file

@ -23,6 +23,7 @@ class RemoteSettings(val config: Config, val systemName: String) extends Extensi
val RemoteSystemDaemonAckTimeout = Duration(config.getMilliseconds("akka.remote.remote-daemon-ack-timeout"), MILLISECONDS) val RemoteSystemDaemonAckTimeout = Duration(config.getMilliseconds("akka.remote.remote-daemon-ack-timeout"), MILLISECONDS)
val InitalDelayForGossip = Duration(config.getMilliseconds("akka.remote.gossip.initialDelay"), MILLISECONDS) val InitalDelayForGossip = Duration(config.getMilliseconds("akka.remote.gossip.initialDelay"), MILLISECONDS)
val GossipFrequency = Duration(config.getMilliseconds("akka.remote.gossip.frequency"), MILLISECONDS) val GossipFrequency = Duration(config.getMilliseconds("akka.remote.gossip.frequency"), MILLISECONDS)
val BackoffTimeout = Duration(config.getMilliseconds("akka.remote.backoff-timeout"), MILLISECONDS)
// TODO cluster config will go into akka-cluster/reference.conf when we enable that module // TODO cluster config will go into akka-cluster/reference.conf when we enable that module
val ClusterName = getString("akka.cluster.name") val ClusterName = getString("akka.cluster.name")

View file

@ -18,14 +18,14 @@ import org.jboss.netty.handler.timeout.{ ReadTimeoutHandler, ReadTimeoutExceptio
import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer } import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer }
import scala.collection.mutable.HashMap import scala.collection.mutable.HashMap
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.util.concurrent._
import java.util.concurrent.atomic._ import java.util.concurrent.atomic._
import akka.AkkaException import akka.AkkaException
import akka.event.Logging import akka.event.Logging
import locks.ReentrantReadWriteLock
import org.jboss.netty.channel._ import org.jboss.netty.channel._
import akka.actor.ActorSystemImpl import akka.actor.ActorSystemImpl
import org.jboss.netty.handler.execution.{ ExecutionHandler, OrderedMemoryAwareThreadPoolExecutor } import org.jboss.netty.handler.execution.{ ExecutionHandler, OrderedMemoryAwareThreadPoolExecutor }
import java.util.concurrent._
import locks.ReentrantReadWriteLock
class RemoteClientMessageBufferException(message: String, cause: Throwable = null) extends AkkaException(message, cause) { class RemoteClientMessageBufferException(message: String, cause: Throwable = null) extends AkkaException(message, cause) {
def this(msg: String) = this(msg, null) def this(msg: String) = this(msg, null)
@ -73,7 +73,9 @@ abstract class RemoteClient private[akka] (
*/ */
private def send(request: (Any, Option[ActorRef], ActorRef)): Unit = { private def send(request: (Any, Option[ActorRef], ActorRef)): Unit = {
try { try {
currentChannel.write(request).addListener( val channel = currentChannel
val f = channel.write(request)
f.addListener(
new ChannelFutureListener { new ChannelFutureListener {
def operationComplete(future: ChannelFuture) { def operationComplete(future: ChannelFuture) {
if (future.isCancelled) { if (future.isCancelled) {
@ -83,6 +85,12 @@ abstract class RemoteClient private[akka] (
} }
} }
}) })
// Check if we should back off
if (!channel.isWritable) {
val backoff = remoteSupport.remote.remoteSettings.BackoffTimeout
if (backoff.length > 0 && !f.await(backoff.length, backoff.unit))
f.setFailure(new TimeoutException("akka.remote.backoff-timeout occurred, overpressure on RemoteTransport or nonresponsive server"))
}
} catch { } catch {
case e: Exception remoteSupport.notifyListeners(RemoteClientError(e, remoteSupport, remoteAddress)) case e: Exception remoteSupport.notifyListeners(RemoteClientError(e, remoteSupport, remoteAddress))
} }

View file

@ -15,6 +15,7 @@ class RemoteConfigSpec extends AkkaSpec("akka.cluster.nodename = node1") {
getString("akka.remote.transport") must equal("akka.remote.netty.NettyRemoteSupport") getString("akka.remote.transport") must equal("akka.remote.netty.NettyRemoteSupport")
getString("akka.remote.secure-cookie") must equal("") getString("akka.remote.secure-cookie") must equal("")
getBoolean("akka.remote.use-passive-connections") must equal(true) getBoolean("akka.remote.use-passive-connections") must equal(true)
getMilliseconds("akka.remote.backoff-timeout") must equal(0)
// getMilliseconds("akka.remote.remote-daemon-ack-timeout") must equal(30 * 1000) // getMilliseconds("akka.remote.remote-daemon-ack-timeout") must equal(30 * 1000)
//akka.remote.server //akka.remote.server