#1192 - Removing the 'guaranteed delivery'/message resend in NettyRemoteSupport

This commit is contained in:
Viktor Klang 2011-10-12 14:45:13 +02:00
parent 41029e2f83
commit fe3c22fe23

View file

@ -147,18 +147,11 @@ abstract class RemoteClient private[akka] (
val module: NettyRemoteClientModule,
val remoteAddress: InetSocketAddress) {
val useTransactionLog = config.getBool("akka.remote.client.buffering.retry-message-send-on-failure", false)
val transactionLogCapacity = config.getInt("akka.remote.client.buffering.capacity", -1)
val name = this.getClass.getSimpleName + "@" +
remoteAddress.getAddress.getHostAddress + "::" +
remoteAddress.getPort
protected val futures = new ConcurrentHashMap[Uuid, Promise[_]]
protected val pendingRequests = {
if (transactionLogCapacity < 0) new ConcurrentLinkedQueue[(Boolean, Uuid, RemoteMessageProtocol)]
else new LinkedBlockingQueue[(Boolean, Uuid, RemoteMessageProtocol)](transactionLogCapacity)
}
private[remote] val runSwitch = new Switch()
@ -172,19 +165,6 @@ abstract class RemoteClient private[akka] (
def shutdown(): Boolean
/**
* Returns an array with the current pending messages not yet delivered.
*/
def pendingMessages: Array[Any] = {
var messages = Vector[Any]()
val iter = pendingRequests.iterator
while (iter.hasNext) {
val (_, _, message) = iter.next
messages = messages :+ MessageSerializer.deserialize(message.getMessage)
}
messages.toArray
}
/**
* Converts the message to the wireprotocol and sends the message across the wire
*/
@ -220,13 +200,7 @@ abstract class RemoteClient private[akka] (
notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress))
}
} catch {
case e: Exception
notifyListeners(RemoteClientError(e, module, remoteAddress))
if (useTransactionLog && !pendingRequests.offer((true, null, request))) { // Add the request to the tx log after a failing send
pendingRequests.clear()
throw new RemoteClientMessageBufferException("Buffer limit [" + transactionLogCapacity + "] reached")
}
case e: Exception notifyListeners(RemoteClientError(e, module, remoteAddress))
}
None
@ -240,14 +214,8 @@ abstract class RemoteClient private[akka] (
futures.put(futureUuid, futureResult) // Add future prematurely, remove it if write fails
def handleRequestReplyError(future: ChannelFuture) = {
if (useTransactionLog && !pendingRequests.offer((false, futureUuid, request))) { // Add the request to the tx log after a failing send
pendingRequests.clear()
throw new RemoteClientMessageBufferException("Buffer limit [" + transactionLogCapacity + "] reached")
} else {
val f = futures.remove(futureUuid) // Clean up future
if (f ne null) f.completeWithException(future.getCause)
}
val f = futures.remove(futureUuid) // Clean up future
if (f ne null) f.completeWithException(future.getCause)
}
var future: ChannelFuture = null
@ -275,41 +243,6 @@ abstract class RemoteClient private[akka] (
throw exception
}
}
private[remote] def sendPendingRequests() = pendingRequests synchronized {
// ensure only one thread at a time can flush the log
val nrOfMessages = pendingRequests.size
if (nrOfMessages > 0) EventHandler.info(this, "Resending [%s] previously failed messages after remote client reconnect" format nrOfMessages)
var pendingRequest = pendingRequests.peek
while (pendingRequest ne null) {
val (isOneWay, futureUuid, message) = pendingRequest
if (isOneWay) {
// tell
val future = currentChannel.write(RemoteEncoder.encode(message))
future.awaitUninterruptibly()
if (future.isCancelled && !future.isSuccess) {
notifyListeners(RemoteClientWriteFailed(message, future.getCause, module, remoteAddress))
}
} else {
// ask
val future = currentChannel.write(RemoteEncoder.encode(message))
future.awaitUninterruptibly()
if (future.isCancelled || !future.isSuccess) {
val f = futures.remove(futureUuid) // Clean up future
if (f ne null) f.completeWithException(future.getCause)
notifyListeners(RemoteClientWriteFailed(message, future.getCause, module, remoteAddress))
}
}
pendingRequests.remove(pendingRequest)
pendingRequest = pendingRequests.peek // try to grab next message
}
}
}
/**
@ -440,7 +373,6 @@ class ActiveRemoteClient private[akka] (
bootstrap.releaseExternalResources()
bootstrap = null
connection = null
pendingRequests.clear()
EventHandler.info(this, "[%s] has been shut down".format(name))
}
@ -555,7 +487,6 @@ class ActiveRemoteClientHandler(
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
try {
if (client.useTransactionLog) client.sendPendingRequests() // try to send pending requests (still there after client/server crash ard reconnect
client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress))
EventHandler.debug(this, "Remote client connected to [%s]".format(ctx.getChannel.getRemoteAddress))
client.resetReconnectionTimeWindow