From dcd49bd052373b352fe20024da8483de23d5ac2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Tue, 29 Mar 2011 17:07:41 +0200 Subject: [PATCH] Added configuration to define capacity to the remote client buffer messages on failure to send --- .../main/scala/akka/event/EventHandler.scala | 10 +++++----- .../remote/netty/NettyRemoteSupport.scala | 19 +++++++++++++++---- .../src/test/scala/config/ConfigSpec.scala | 2 -- config/akka-reference.conf | 10 +++++----- 4 files changed, 25 insertions(+), 16 deletions(-) diff --git a/akka-actor/src/main/scala/akka/event/EventHandler.scala b/akka-actor/src/main/scala/akka/event/EventHandler.scala index d4fc55b0a9..5b8245d1d4 100644 --- a/akka-actor/src/main/scala/akka/event/EventHandler.scala +++ b/akka-actor/src/main/scala/akka/event/EventHandler.scala @@ -25,7 +25,7 @@ import akka.AkkaException * case EventHandler.Warning(instance, message) => ... * case EventHandler.Info(instance, message) => ... * case EventHandler.Debug(instance, message) => ... - * case genericEvent => ... + * case genericEvent => ... * } * }) * @@ -35,7 +35,7 @@ import akka.AkkaException * *

* However best is probably to register the listener in the 'akka.conf' - * configuration file. + * configuration file. *

* Log an error event: *

@@ -45,7 +45,7 @@ import akka.AkkaException
  * 
  * EventHandler.error(exception, this, message.toString)
  * 
- * + * * @author Jonas Bonér */ object EventHandler extends ListenerManagement { @@ -73,7 +73,7 @@ object EventHandler extends ListenerManagement { val debug = "[DEBUG] [%s] [%s] [%s] %s".intern val generic = "[GENERIC] [%s] [%s]".intern val ID = "event:handler".intern - + class EventHandlerException extends AkkaException lazy val EventHandlerDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(ID).build @@ -129,7 +129,7 @@ object EventHandler extends ListenerManagement { else if (eventClass.isInstanceOf[Debug]) DebugLevel else DebugLevel } - + class DefaultListener extends Actor { self.id = ID self.dispatcher = EventHandlerDispatcher diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 210b818784..3d603508e8 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -39,6 +39,9 @@ import java.net.InetSocketAddress import java.lang.reflect.InvocationTargetException import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean} import java.util.concurrent._ +import akka.AkkaException + +class RemoteClientMessageBufferException(message: String) extends AkkaException(message) object RemoteEncoder { def encode(rmp: RemoteMessageProtocol): AkkaRemoteProtocol = { @@ -159,7 +162,8 @@ abstract class RemoteClient private[akka] ( val module: NettyRemoteClientModule, val remoteAddress: InetSocketAddress) { - val useTransactionLog = config.getBool("akka.remote.retry-message-send-on-failure", true) + val useTransactionLog = config.getBool("akka.remote.client.buffering.retry-message-send-on-failure", true) + val transactionLogCapacity = config.getInt("akka.remote.client.buffering.capacity", -1) val name = this.getClass.getSimpleName + "@" + remoteAddress.getAddress.getHostAddress + "::" + @@ -167,7 +171,10 @@ abstract class RemoteClient private[akka] ( protected val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]] protected val supervisors = new ConcurrentHashMap[Uuid, ActorRef] - protected val pendingRequests = new ConcurrentLinkedQueue[(Boolean, Uuid, RemoteMessageProtocol)] + protected val pendingRequests = { + if (transactionLogCapacity < 0) new ConcurrentLinkedQueue[(Boolean, Uuid, RemoteMessageProtocol)] + else new LinkedBlockingQueue[(Boolean, Uuid, RemoteMessageProtocol)](transactionLogCapacity) + } private[remote] val runSwitch = new Switch() private[remote] val isAuthenticated = new AtomicBoolean(false) @@ -243,7 +250,10 @@ abstract class RemoteClient private[akka] ( case e: Throwable => // add the request to the tx log after a failing send notifyListeners(RemoteClientError(e, module, remoteAddress)) - if (useTransactionLog) pendingRequests.add((true, null, request)) + if (useTransactionLog) { + if (!pendingRequests.offer((true, null, request))) + throw new RemoteClientMessageBufferException("Buffer limit [" + transactionLogCapacity + "] reached") + } else throw e } None @@ -256,7 +266,8 @@ abstract class RemoteClient private[akka] ( def handleRequestReplyError(future: ChannelFuture) = { notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) if (useTransactionLog) { - pendingRequests.add((false, futureUuid, request)) // Add the request to the tx log after a failing send + if (!pendingRequests.offer((false, futureUuid, request))) // Add the request to the tx log after a failing send + throw new RemoteClientMessageBufferException("Buffer limit [" + transactionLogCapacity + "] reached") } else { val f = futures.remove(futureUuid) // Clean up future if (f ne null) f.completeWithException(future.getCause) diff --git a/akka-stm/src/test/scala/config/ConfigSpec.scala b/akka-stm/src/test/scala/config/ConfigSpec.scala index 4108a99d63..8636254ced 100644 --- a/akka-stm/src/test/scala/config/ConfigSpec.scala +++ b/akka-stm/src/test/scala/config/ConfigSpec.scala @@ -16,8 +16,6 @@ class ConfigSpec extends WordSpec with MustMatchers { "contain all configuration properties for akka-stm that are used in code with their correct defaults" in { import Config.config._ - getInt("akka.storage.max-retries") must equal(Some(10)) - getBool("akka.stm.blocking-allowed") must equal(Some(false)) getBool("akka.stm.fair") must equal(Some(true)) getBool("akka.stm.interruptible") must equal(Some(false)) diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 164fae1edc..1c3676ad31 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -146,7 +146,11 @@ akka { } client { - retry-message-send-on-failure = on + buffering { + retry-message-send-on-failure = on + capacity = -1 # If negative (or zero) then an unbounded mailbox is used (default) + # If positive then a bounded mailbox is used and the capacity is set using the property + } reconnect-delay = 5 read-timeout = 10 message-frame-size = 1048576 @@ -154,8 +158,4 @@ akka { reconnection-time-window = 600 # Maximum time window that a client should try to reconnect for } } - - storage { - max-retries = 10 - } }