diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index ba83c58c0c..a51022c95f 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -5,6 +5,7 @@ package akka.remote.netty import akka.dispatch.{DefaultCompletableFuture, CompletableFuture, Future} +import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings} import akka.remote.protocol.RemoteProtocol._ import akka.remote.protocol.RemoteProtocol.ActorType._ import akka.serialization.RemoteActorSerialization @@ -31,12 +32,11 @@ import org.jboss.netty.handler.execution.{ OrderedMemoryAwareThreadPoolExecutor, import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer } import scala.collection.mutable.HashMap +import scala.collection.JavaConversions._ import java.net.InetSocketAddress import java.lang.reflect.InvocationTargetException import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean} -import akka.remoteinterface.{RemoteEventHandler} -import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings} import java.util.concurrent._ object RemoteEncoder { @@ -163,7 +163,7 @@ abstract class RemoteClient private[akka] ( protected val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]] protected val supervisors = new ConcurrentHashMap[Uuid, ActorRef] - protected val pendingMessages = new ConcurrentLinkedQueue[(Boolean, Uuid, RemoteMessageProtocol)] + protected val pendingRequests = new ConcurrentLinkedQueue[(Boolean, Uuid, RemoteMessageProtocol)] private[remote] val runSwitch = new Switch() private[remote] val isAuthenticated = new AtomicBoolean(false) @@ -178,6 +178,13 @@ abstract class RemoteClient private[akka] ( def shutdown: Boolean + /** + * Returns an array with the current pending messages not yet delivered. + */ + def pendingMessages: Array[Any] = pendingRequests + .toArray.asInstanceOf[Array[(Boolean, Uuid, RemoteMessageProtocol)]] + .map(req => MessageSerializer.deserialize(req._3.getMessage)) + /** * Converts the message to the wireprotocol and sends the message across the wire */ @@ -215,7 +222,7 @@ abstract class RemoteClient private[akka] ( senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = { if (isRunning) { if (request.getOneWay) { - pendingMessages.add((true, null, request)) + pendingRequests.add((true, null, request)) sendPendingMessages() None } else { @@ -223,7 +230,7 @@ abstract class RemoteClient private[akka] ( else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout) val futureUuid = uuidFrom(request.getUuid.getHigh, request.getUuid.getLow) futures.put(futureUuid, futureResult) // Add future prematurely, remove it if write fails - pendingMessages.add((false, futureUuid, request)) + pendingRequests.add((false, futureUuid, request)) sendPendingMessages() Some(futureResult) } @@ -235,7 +242,7 @@ abstract class RemoteClient private[akka] ( } private[remote] def sendPendingMessages() = { - var pendingMessage = pendingMessages.peek // try to grab first message + var pendingMessage = pendingRequests.peek // try to grab first message while (pendingMessage ne null) { val (isOneWay, futureUuid, message) = pendingMessage if (isOneWay) { // sendOneWay @@ -256,8 +263,8 @@ abstract class RemoteClient private[akka] ( notifyListeners(RemoteClientWriteFailed(message, future.getCause, module, remoteAddress)) } } - pendingMessages.remove(pendingMessage) // message delivered; remove from tx log - pendingMessage = pendingMessages.peek // try to grab next message + pendingRequests.remove(pendingMessage) // message delivered; remove from tx log + pendingMessage = pendingRequests.peek // try to grab next message } } @@ -352,7 +359,7 @@ class ActiveRemoteClient private[akka] ( bootstrap.releaseExternalResources bootstrap = null connection = null - pendingMessages.clear + pendingRequests.clear } private[akka] def isWithinReconnectionTimeWindow: Boolean = {