Added configuration to define capacity to the remote client buffer messages on failure to send

This commit is contained in:
Jonas Bonér 2011-03-29 17:07:41 +02:00
parent ec76287a78
commit dcd49bd052
4 changed files with 25 additions and 16 deletions

View file

@ -25,7 +25,7 @@ import akka.AkkaException
* case EventHandler.Warning(instance, message) => ...
* case EventHandler.Info(instance, message) => ...
* case EventHandler.Debug(instance, message) => ...
* case genericEvent => ...
* case genericEvent => ...
* }
* })
*
@ -35,7 +35,7 @@ import akka.AkkaException
* </pre>
* <p/>
* However best is probably to register the listener in the 'akka.conf'
* configuration file.
* configuration file.
* <p/>
* Log an error event:
* <pre>
@ -45,7 +45,7 @@ import akka.AkkaException
* <pre>
* EventHandler.error(exception, this, message.toString)
* </pre>
*
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object EventHandler extends ListenerManagement {
@ -73,7 +73,7 @@ object EventHandler extends ListenerManagement {
val debug = "[DEBUG] [%s] [%s] [%s] %s".intern
val generic = "[GENERIC] [%s] [%s]".intern
val ID = "event:handler".intern
class EventHandlerException extends AkkaException
lazy val EventHandlerDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(ID).build
@ -129,7 +129,7 @@ object EventHandler extends ListenerManagement {
else if (eventClass.isInstanceOf[Debug]) DebugLevel
else DebugLevel
}
class DefaultListener extends Actor {
self.id = ID
self.dispatcher = EventHandlerDispatcher

View file

@ -39,6 +39,9 @@ import java.net.InetSocketAddress
import java.lang.reflect.InvocationTargetException
import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean}
import java.util.concurrent._
import akka.AkkaException
class RemoteClientMessageBufferException(message: String) extends AkkaException(message)
object RemoteEncoder {
def encode(rmp: RemoteMessageProtocol): AkkaRemoteProtocol = {
@ -159,7 +162,8 @@ abstract class RemoteClient private[akka] (
val module: NettyRemoteClientModule,
val remoteAddress: InetSocketAddress) {
val useTransactionLog = config.getBool("akka.remote.retry-message-send-on-failure", true)
val useTransactionLog = config.getBool("akka.remote.client.buffering.retry-message-send-on-failure", true)
val transactionLogCapacity = config.getInt("akka.remote.client.buffering.capacity", -1)
val name = this.getClass.getSimpleName + "@" +
remoteAddress.getAddress.getHostAddress + "::" +
@ -167,7 +171,10 @@ abstract class RemoteClient private[akka] (
protected val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]]
protected val supervisors = new ConcurrentHashMap[Uuid, ActorRef]
protected val pendingRequests = new ConcurrentLinkedQueue[(Boolean, Uuid, RemoteMessageProtocol)]
protected val pendingRequests = {
if (transactionLogCapacity < 0) new ConcurrentLinkedQueue[(Boolean, Uuid, RemoteMessageProtocol)]
else new LinkedBlockingQueue[(Boolean, Uuid, RemoteMessageProtocol)](transactionLogCapacity)
}
private[remote] val runSwitch = new Switch()
private[remote] val isAuthenticated = new AtomicBoolean(false)
@ -243,7 +250,10 @@ abstract class RemoteClient private[akka] (
case e: Throwable =>
// add the request to the tx log after a failing send
notifyListeners(RemoteClientError(e, module, remoteAddress))
if (useTransactionLog) pendingRequests.add((true, null, request))
if (useTransactionLog) {
if (!pendingRequests.offer((true, null, request)))
throw new RemoteClientMessageBufferException("Buffer limit [" + transactionLogCapacity + "] reached")
}
else throw e
}
None
@ -256,7 +266,8 @@ abstract class RemoteClient private[akka] (
def handleRequestReplyError(future: ChannelFuture) = {
notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress))
if (useTransactionLog) {
pendingRequests.add((false, futureUuid, request)) // Add the request to the tx log after a failing send
if (!pendingRequests.offer((false, futureUuid, request))) // Add the request to the tx log after a failing send
throw new RemoteClientMessageBufferException("Buffer limit [" + transactionLogCapacity + "] reached")
} else {
val f = futures.remove(futureUuid) // Clean up future
if (f ne null) f.completeWithException(future.getCause)

View file

@ -16,8 +16,6 @@ class ConfigSpec extends WordSpec with MustMatchers {
"contain all configuration properties for akka-stm that are used in code with their correct defaults" in {
import Config.config._
getInt("akka.storage.max-retries") must equal(Some(10))
getBool("akka.stm.blocking-allowed") must equal(Some(false))
getBool("akka.stm.fair") must equal(Some(true))
getBool("akka.stm.interruptible") must equal(Some(false))

View file

@ -146,7 +146,11 @@ akka {
}
client {
retry-message-send-on-failure = on
buffering {
retry-message-send-on-failure = on
capacity = -1 # If negative (or zero) then an unbounded mailbox is used (default)
# If positive then a bounded mailbox is used and the capacity is set using the property
}
reconnect-delay = 5
read-timeout = 10
message-frame-size = 1048576
@ -154,8 +158,4 @@ akka {
reconnection-time-window = 600 # Maximum time window that a client should try to reconnect for
}
}
storage {
max-retries = 10
}
}