1. Added a 'pending-messages' tx log for all pending messages that are not yet delivered to the remote host, this tx log is retried upon successful remote client reconnect.

2. Fixed broken code in UnparsableException and renamed it to UnknownRemoteException.
This commit is contained in:
Jonas Bonér 2011-03-24 16:23:37 +01:00
parent dfcbf75011
commit cd93f5750f
3 changed files with 67 additions and 40 deletions

View file

@ -14,7 +14,10 @@ package object actor {
ref.asInstanceOf[ActorRef]
type Uuid = com.eaio.uuid.UUID
def newUuid(): Uuid = new Uuid()
def uuidFrom(time: Long, clockSeqAndNode: Long): Uuid = new Uuid(time,clockSeqAndNode)
def uuidFrom(time: Long, clockSeqAndNode: Long): Uuid = new Uuid(time, clockSeqAndNode)
def uuidFrom(uuid: String): Uuid = new Uuid(uuid)
}

View file

@ -14,6 +14,7 @@ import scala.reflect.BeanProperty
import java.net.InetSocketAddress
import java.util.concurrent.ConcurrentHashMap
import java.io.{PrintWriter, PrintStream}
trait RemoteModule {
val UUID_PREFIX = "uuid:"
@ -113,16 +114,21 @@ case class RemoteServerWriteFailed(
/**
* Thrown for example when trying to send a message using a RemoteClient that is either not started or shut down.
*/
class RemoteClientException private[akka] (message: String,
@BeanProperty val client: RemoteClientModule,
val remoteAddress: InetSocketAddress) extends AkkaException(message)
class RemoteClientException private[akka] (
message: String,
@BeanProperty val client: RemoteClientModule,
val remoteAddress: InetSocketAddress) extends AkkaException(message)
/**
* Returned when a remote exception cannot be instantiated or parsed
* Returned when a remote exception sent over the wire cannot be loaded and instantiated
*/
case class UnparsableException private[akka] (originalClassName: String,
originalMessage: String) extends AkkaException(originalMessage)
case class UnknownRemoteException private[akka] (cause: Throwable, originalClassName: String, originalMessage: String)
extends AkkaException("\nParsingError[%s]\nOriginalException[%s]\nOriginalMessage[%s]"
.format(cause.toString, originalClassName, originalMessage)) {
override def printStackTrace = cause.printStackTrace
override def printStackTrace(printStream: PrintStream) = cause.printStackTrace(printStream)
override def printStackTrace(printWriter: PrintWriter) = cause.printStackTrace(printWriter)
}
abstract class RemoteSupport extends ListenerManagement with RemoteServerModule with RemoteClientModule {

View file

@ -34,10 +34,10 @@ import scala.collection.mutable.HashMap
import java.net.InetSocketAddress
import java.lang.reflect.InvocationTargetException
import java.util.concurrent.{ TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap }
import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean}
import akka.remoteinterface.{RemoteEventHandler}
import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings}
import java.util.concurrent._
object RemoteEncoder {
def encode(rmp: RemoteMessageProtocol): AkkaRemoteProtocol = {
@ -157,19 +157,25 @@ abstract class RemoteClient private[akka] (
val module: NettyRemoteClientModule,
val remoteAddress: InetSocketAddress) {
val name = this.getClass.getSimpleName + "@" + remoteAddress.getAddress.getHostAddress + "::" + remoteAddress.getPort
val name = this.getClass.getSimpleName + "@" +
remoteAddress.getAddress.getHostAddress + "::" +
remoteAddress.getPort
protected val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]]
protected val supervisors = new ConcurrentHashMap[Uuid, ActorRef]
private[remote] val runSwitch = new Switch()
protected val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]]
protected val supervisors = new ConcurrentHashMap[Uuid, ActorRef]
protected val pendingMessages = new ConcurrentLinkedQueue[(Boolean, Uuid, RemoteMessageProtocol)]
private[remote] val runSwitch = new Switch()
private[remote] val isAuthenticated = new AtomicBoolean(false)
private[remote] def isRunning = runSwitch.isOn
protected def notifyListeners(msg: => Any); Unit
protected def currentChannel: Channel
def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean
def shutdown: Boolean
/**
@ -207,36 +213,19 @@ abstract class RemoteClient private[akka] (
def send[T](
request: RemoteMessageProtocol,
senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = {
if (isRunning) {
if (request.getOneWay) {
txLog.add(request)
val future = currentChannel.write(RemoteEncoder.encode(request))
future.awaitUninterruptibly()
if (!future.isCancelled && !future.isSuccess) {
notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress))
throw future.getCause
} else
pendingMessages.add((true, null, request))
sendPendingMessages()
None
} else {
val futureResult = if (senderFuture.isDefined) senderFuture.get
else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout)
val futureUuid = uuidFrom(request.getUuid.getHigh, request.getUuid.getLow)
futures.put(futureUuid, futureResult) //Add this prematurely, remove it if write fails
currentChannel.write(RemoteEncoder.encode(request)).addListener(new ChannelFutureListener {
def operationComplete(future: ChannelFuture) {
if (future.isCancelled) {
futures.remove(futureUuid) //Clean this up
//We don't care about that right now
} else if (!future.isSuccess) {
val f = futures.remove(futureUuid) //Clean this up
if (f ne null)
f.completeWithException(future.getCause)
notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress))
}
}
})
Some(futureResult)
val futureResult = if (senderFuture.isDefined) senderFuture.get
else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout)
val futureUuid = uuidFrom(request.getUuid.getHigh, request.getUuid.getLow)
futures.put(futureUuid, futureResult) // Add future prematurely, remove it if write fails
pendingMessages.add((false, futureUuid, request))
sendPendingMessages()
Some(futureResult)
}
} else {
val exception = new RemoteClientException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.", module, remoteAddress)
@ -245,6 +234,32 @@ abstract class RemoteClient private[akka] (
}
}
private[remote] def sendPendingMessages() = {
var pendingMessage = pendingMessages.poll
while (pendingMessage ne null) {
val (isOneWay, futureUuid, message) = pendingMessage
if (isOneWay) {
val future = currentChannel.write(RemoteEncoder.encode(message))
future.awaitUninterruptibly()
if (!future.isCancelled && !future.isSuccess) {
notifyListeners(RemoteClientWriteFailed(message, future.getCause, module, remoteAddress))
throw future.getCause
}
} else {
val future = currentChannel.write(RemoteEncoder.encode(message))
future.awaitUninterruptibly()
if (future.isCancelled) {
futures.remove(futureUuid) // Clean up future
} else if (!future.isSuccess) {
val f = futures.remove(futureUuid) // Clean up future
if (f ne null) f.completeWithException(future.getCause)
notifyListeners(RemoteClientWriteFailed(message, future.getCause, module, remoteAddress))
}
}
pendingMessage = pendingMessages.poll
}
}
private[akka] def registerSupervisorForActor(actorRef: ActorRef): ActorRef =
if (!actorRef.supervisor.isDefined) throw new IllegalActorStateException(
"Can't register supervisor for " + actorRef + " since it is not under supervision")
@ -265,6 +280,7 @@ class ActiveRemoteClient private[akka] (
module: NettyRemoteClientModule, remoteAddress: InetSocketAddress,
val loader: Option[ClassLoader] = None, notifyListenersFun: (=> Any) => Unit) extends RemoteClient(module, remoteAddress) {
import RemoteClientSettings._
//FIXME rewrite to a wrapper object (minimize volatile access and maximize encapsulation)
@volatile private var bootstrap: ClientBootstrap = _
@volatile private[remote] var connection: ChannelFuture = _
@ -335,6 +351,7 @@ class ActiveRemoteClient private[akka] (
bootstrap.releaseExternalResources
bootstrap = null
connection = null
pendingMessages.clear
}
private[akka] def isWithinReconnectionTimeWindow: Boolean = {
@ -449,6 +466,7 @@ class ActiveRemoteClientHandler(
}
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
client.sendPendingMessages() // try to send pending message (still there after client/server crash ard reconnect
client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress))
client.resetReconnectionTimeWindow
}
@ -479,7 +497,7 @@ class ActiveRemoteClientHandler(
} catch {
case problem: Throwable =>
EventHandler.error(problem, this, problem.getMessage)
UnparsableException(classname, exception.getMessage)
UnknownRemoteException(problem, classname, exception.getMessage)
}
}
}