diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 843abe6a7e..4a4ae90182 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -147,18 +147,11 @@ abstract class RemoteClient private[akka] ( val module: NettyRemoteClientModule, val remoteAddress: InetSocketAddress) { - val useTransactionLog = config.getBool("akka.remote.client.buffering.retry-message-send-on-failure", false) - val transactionLogCapacity = config.getInt("akka.remote.client.buffering.capacity", -1) - val name = this.getClass.getSimpleName + "@" + remoteAddress.getAddress.getHostAddress + "::" + remoteAddress.getPort protected val futures = new ConcurrentHashMap[Uuid, Promise[_]] - protected val pendingRequests = { - if (transactionLogCapacity < 0) new ConcurrentLinkedQueue[(Boolean, Uuid, RemoteMessageProtocol)] - else new LinkedBlockingQueue[(Boolean, Uuid, RemoteMessageProtocol)](transactionLogCapacity) - } private[remote] val runSwitch = new Switch() @@ -172,19 +165,6 @@ abstract class RemoteClient private[akka] ( def shutdown(): Boolean - /** - * Returns an array with the current pending messages not yet delivered. - */ - def pendingMessages: Array[Any] = { - var messages = Vector[Any]() - val iter = pendingRequests.iterator - while (iter.hasNext) { - val (_, _, message) = iter.next - messages = messages :+ MessageSerializer.deserialize(message.getMessage) - } - messages.toArray - } - /** * Converts the message to the wireprotocol and sends the message across the wire */ @@ -220,13 +200,7 @@ abstract class RemoteClient private[akka] ( notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) } } catch { - case e: Exception ⇒ - notifyListeners(RemoteClientError(e, module, remoteAddress)) - - if (useTransactionLog && !pendingRequests.offer((true, null, request))) { // Add the request to the tx log after a failing send - pendingRequests.clear() - throw new RemoteClientMessageBufferException("Buffer limit [" + transactionLogCapacity + "] reached") - } + case e: Exception ⇒ notifyListeners(RemoteClientError(e, module, remoteAddress)) } None @@ -240,14 +214,8 @@ abstract class RemoteClient private[akka] ( futures.put(futureUuid, futureResult) // Add future prematurely, remove it if write fails def handleRequestReplyError(future: ChannelFuture) = { - if (useTransactionLog && !pendingRequests.offer((false, futureUuid, request))) { // Add the request to the tx log after a failing send - pendingRequests.clear() - throw new RemoteClientMessageBufferException("Buffer limit [" + transactionLogCapacity + "] reached") - - } else { - val f = futures.remove(futureUuid) // Clean up future - if (f ne null) f.completeWithException(future.getCause) - } + val f = futures.remove(futureUuid) // Clean up future + if (f ne null) f.completeWithException(future.getCause) } var future: ChannelFuture = null @@ -275,41 +243,6 @@ abstract class RemoteClient private[akka] ( throw exception } } - - private[remote] def sendPendingRequests() = pendingRequests synchronized { - // ensure only one thread at a time can flush the log - val nrOfMessages = pendingRequests.size - if (nrOfMessages > 0) EventHandler.info(this, "Resending [%s] previously failed messages after remote client reconnect" format nrOfMessages) - var pendingRequest = pendingRequests.peek - - while (pendingRequest ne null) { - val (isOneWay, futureUuid, message) = pendingRequest - - if (isOneWay) { - // tell - val future = currentChannel.write(RemoteEncoder.encode(message)) - future.awaitUninterruptibly() - - if (future.isCancelled && !future.isSuccess) { - notifyListeners(RemoteClientWriteFailed(message, future.getCause, module, remoteAddress)) - } - - } else { - // ask - val future = currentChannel.write(RemoteEncoder.encode(message)) - future.awaitUninterruptibly() - - if (future.isCancelled || !future.isSuccess) { - val f = futures.remove(futureUuid) // Clean up future - if (f ne null) f.completeWithException(future.getCause) - notifyListeners(RemoteClientWriteFailed(message, future.getCause, module, remoteAddress)) - } - } - - pendingRequests.remove(pendingRequest) - pendingRequest = pendingRequests.peek // try to grab next message - } - } } /** @@ -440,7 +373,6 @@ class ActiveRemoteClient private[akka] ( bootstrap.releaseExternalResources() bootstrap = null connection = null - pendingRequests.clear() EventHandler.info(this, "[%s] has been shut down".format(name)) } @@ -555,7 +487,6 @@ class ActiveRemoteClientHandler( override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { try { - if (client.useTransactionLog) client.sendPendingRequests() // try to send pending requests (still there after client/server crash ard reconnect client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress)) EventHandler.debug(this, "Remote client connected to [%s]".format(ctx.getChannel.getRemoteAddress)) client.resetReconnectionTimeWindow