Added accessor for pending messages

This commit is contained in:
Jonas Bonér 2011-03-25 08:36:43 +01:00
parent 4578dd26cb
commit 331b5c7b74

View file

@ -5,6 +5,7 @@
package akka.remote.netty
import akka.dispatch.{DefaultCompletableFuture, CompletableFuture, Future}
import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings}
import akka.remote.protocol.RemoteProtocol._
import akka.remote.protocol.RemoteProtocol.ActorType._
import akka.serialization.RemoteActorSerialization
@ -31,12 +32,11 @@ import org.jboss.netty.handler.execution.{ OrderedMemoryAwareThreadPoolExecutor,
import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer }
import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._
import java.net.InetSocketAddress
import java.lang.reflect.InvocationTargetException
import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean}
import akka.remoteinterface.{RemoteEventHandler}
import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings}
import java.util.concurrent._
object RemoteEncoder {
@ -163,7 +163,7 @@ abstract class RemoteClient private[akka] (
protected val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]]
protected val supervisors = new ConcurrentHashMap[Uuid, ActorRef]
protected val pendingMessages = new ConcurrentLinkedQueue[(Boolean, Uuid, RemoteMessageProtocol)]
protected val pendingRequests = new ConcurrentLinkedQueue[(Boolean, Uuid, RemoteMessageProtocol)]
private[remote] val runSwitch = new Switch()
private[remote] val isAuthenticated = new AtomicBoolean(false)
@ -178,6 +178,13 @@ abstract class RemoteClient private[akka] (
def shutdown: Boolean
/**
* Returns an array with the current pending messages not yet delivered.
*/
def pendingMessages: Array[Any] = pendingRequests
.toArray.asInstanceOf[Array[(Boolean, Uuid, RemoteMessageProtocol)]]
.map(req => MessageSerializer.deserialize(req._3.getMessage))
/**
* Converts the message to the wireprotocol and sends the message across the wire
*/
@ -215,7 +222,7 @@ abstract class RemoteClient private[akka] (
senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = {
if (isRunning) {
if (request.getOneWay) {
pendingMessages.add((true, null, request))
pendingRequests.add((true, null, request))
sendPendingMessages()
None
} else {
@ -223,7 +230,7 @@ abstract class RemoteClient private[akka] (
else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout)
val futureUuid = uuidFrom(request.getUuid.getHigh, request.getUuid.getLow)
futures.put(futureUuid, futureResult) // Add future prematurely, remove it if write fails
pendingMessages.add((false, futureUuid, request))
pendingRequests.add((false, futureUuid, request))
sendPendingMessages()
Some(futureResult)
}
@ -235,7 +242,7 @@ abstract class RemoteClient private[akka] (
}
private[remote] def sendPendingMessages() = {
var pendingMessage = pendingMessages.peek // try to grab first message
var pendingMessage = pendingRequests.peek // try to grab first message
while (pendingMessage ne null) {
val (isOneWay, futureUuid, message) = pendingMessage
if (isOneWay) { // sendOneWay
@ -256,8 +263,8 @@ abstract class RemoteClient private[akka] (
notifyListeners(RemoteClientWriteFailed(message, future.getCause, module, remoteAddress))
}
}
pendingMessages.remove(pendingMessage) // message delivered; remove from tx log
pendingMessage = pendingMessages.peek // try to grab next message
pendingRequests.remove(pendingMessage) // message delivered; remove from tx log
pendingMessage = pendingRequests.peek // try to grab next message
}
}
@ -352,7 +359,7 @@ class ActiveRemoteClient private[akka] (
bootstrap.releaseExternalResources
bootstrap = null
connection = null
pendingMessages.clear
pendingRequests.clear
}
private[akka] def isWithinReconnectionTimeWindow: Boolean = {