From f2ebd7a2f41d6047af422983aa42f8380fa80265 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Thu, 24 Mar 2011 16:23:37 +0100 Subject: [PATCH] 1. Added a 'pending-messages' tx log for all pending messages that are not yet delivered to the remote host, this tx log is retried upon successful remote client reconnect. 2. Fixed broken code in UnparsableException and renamed it to UnknownRemoteException. --- .../src/main/scala/akka/Implicits.scala | 5 +- .../remoteinterface/RemoteInterface.scala | 20 +++-- .../remote/netty/NettyRemoteSupport.scala | 83 ++++++++++++------- 3 files changed, 68 insertions(+), 40 deletions(-) diff --git a/akka-actor/src/main/scala/akka/Implicits.scala b/akka-actor/src/main/scala/akka/Implicits.scala index 6370e1c2fd..0a781649eb 100644 --- a/akka-actor/src/main/scala/akka/Implicits.scala +++ b/akka-actor/src/main/scala/akka/Implicits.scala @@ -14,7 +14,10 @@ package object actor { ref.asInstanceOf[ActorRef] type Uuid = com.eaio.uuid.UUID + def newUuid(): Uuid = new Uuid() - def uuidFrom(time: Long, clockSeqAndNode: Long): Uuid = new Uuid(time,clockSeqAndNode) + + def uuidFrom(time: Long, clockSeqAndNode: Long): Uuid = new Uuid(time, clockSeqAndNode) + def uuidFrom(uuid: String): Uuid = new Uuid(uuid) } diff --git a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala index 3fd26bbfbb..b56ed67277 100644 --- a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala @@ -14,6 +14,7 @@ import scala.reflect.BeanProperty import java.net.InetSocketAddress import java.util.concurrent.ConcurrentHashMap +import java.io.{PrintWriter, PrintStream} trait RemoteModule { val UUID_PREFIX = "uuid:" @@ -113,16 +114,21 @@ case class RemoteServerWriteFailed( /** * Thrown for example when trying to send a message using a RemoteClient that is either not started or shut down. */ -class RemoteClientException private[akka] (message: String, - @BeanProperty val client: RemoteClientModule, - val remoteAddress: InetSocketAddress) extends AkkaException(message) +class RemoteClientException private[akka] ( + message: String, + @BeanProperty val client: RemoteClientModule, + val remoteAddress: InetSocketAddress) extends AkkaException(message) /** - * Returned when a remote exception cannot be instantiated or parsed + * Returned when a remote exception sent over the wire cannot be loaded and instantiated */ -case class UnparsableException private[akka] (originalClassName: String, - originalMessage: String) extends AkkaException(originalMessage) - +case class UnknownRemoteException private[akka] (cause: Throwable, originalClassName: String, originalMessage: String) + extends AkkaException("\nParsingError[%s]\nOriginalException[%s]\nOriginalMessage[%s]" + .format(cause.toString, originalClassName, originalMessage)) { + override def printStackTrace = cause.printStackTrace + override def printStackTrace(printStream: PrintStream) = cause.printStackTrace(printStream) + override def printStackTrace(printWriter: PrintWriter) = cause.printStackTrace(printWriter) +} abstract class RemoteSupport extends ListenerManagement with RemoteServerModule with RemoteClientModule { diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 2f4579cc36..ba83c58c0c 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -34,10 +34,10 @@ import scala.collection.mutable.HashMap import java.net.InetSocketAddress import java.lang.reflect.InvocationTargetException -import java.util.concurrent.{ TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap } import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean} import akka.remoteinterface.{RemoteEventHandler} import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings} +import java.util.concurrent._ object RemoteEncoder { def encode(rmp: RemoteMessageProtocol): AkkaRemoteProtocol = { @@ -157,19 +157,25 @@ abstract class RemoteClient private[akka] ( val module: NettyRemoteClientModule, val remoteAddress: InetSocketAddress) { - val name = this.getClass.getSimpleName + "@" + remoteAddress.getAddress.getHostAddress + "::" + remoteAddress.getPort + val name = this.getClass.getSimpleName + "@" + + remoteAddress.getAddress.getHostAddress + "::" + + remoteAddress.getPort - protected val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]] - protected val supervisors = new ConcurrentHashMap[Uuid, ActorRef] - private[remote] val runSwitch = new Switch() + protected val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]] + protected val supervisors = new ConcurrentHashMap[Uuid, ActorRef] + protected val pendingMessages = new ConcurrentLinkedQueue[(Boolean, Uuid, RemoteMessageProtocol)] + + private[remote] val runSwitch = new Switch() private[remote] val isAuthenticated = new AtomicBoolean(false) private[remote] def isRunning = runSwitch.isOn protected def notifyListeners(msg: => Any); Unit + protected def currentChannel: Channel def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean + def shutdown: Boolean /** @@ -207,36 +213,19 @@ abstract class RemoteClient private[akka] ( def send[T]( request: RemoteMessageProtocol, senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = { - if (isRunning) { if (request.getOneWay) { - txLog.add(request) - val future = currentChannel.write(RemoteEncoder.encode(request)) - future.awaitUninterruptibly() - if (!future.isCancelled && !future.isSuccess) { - notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) - throw future.getCause - } else + pendingMessages.add((true, null, request)) + sendPendingMessages() None } else { - val futureResult = if (senderFuture.isDefined) senderFuture.get - else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout) - val futureUuid = uuidFrom(request.getUuid.getHigh, request.getUuid.getLow) - futures.put(futureUuid, futureResult) //Add this prematurely, remove it if write fails - currentChannel.write(RemoteEncoder.encode(request)).addListener(new ChannelFutureListener { - def operationComplete(future: ChannelFuture) { - if (future.isCancelled) { - futures.remove(futureUuid) //Clean this up - //We don't care about that right now - } else if (!future.isSuccess) { - val f = futures.remove(futureUuid) //Clean this up - if (f ne null) - f.completeWithException(future.getCause) - notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) - } - } - }) - Some(futureResult) + val futureResult = if (senderFuture.isDefined) senderFuture.get + else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout) + val futureUuid = uuidFrom(request.getUuid.getHigh, request.getUuid.getLow) + futures.put(futureUuid, futureResult) // Add future prematurely, remove it if write fails + pendingMessages.add((false, futureUuid, request)) + sendPendingMessages() + Some(futureResult) } } else { val exception = new RemoteClientException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.", module, remoteAddress) @@ -245,6 +234,33 @@ abstract class RemoteClient private[akka] ( } } + private[remote] def sendPendingMessages() = { + var pendingMessage = pendingMessages.peek // try to grab first message + while (pendingMessage ne null) { + val (isOneWay, futureUuid, message) = pendingMessage + if (isOneWay) { // sendOneWay + val future = currentChannel.write(RemoteEncoder.encode(message)) + future.awaitUninterruptibly() + if (!future.isCancelled && !future.isSuccess) { + notifyListeners(RemoteClientWriteFailed(message, future.getCause, module, remoteAddress)) + throw future.getCause + } + } else { // sendRequestReply + val future = currentChannel.write(RemoteEncoder.encode(message)) + future.awaitUninterruptibly() + if (future.isCancelled) { + futures.remove(futureUuid) // Clean up future + } else if (!future.isSuccess) { + val f = futures.remove(futureUuid) // Clean up future + if (f ne null) f.completeWithException(future.getCause) + notifyListeners(RemoteClientWriteFailed(message, future.getCause, module, remoteAddress)) + } + } + pendingMessages.remove(pendingMessage) // message delivered; remove from tx log + pendingMessage = pendingMessages.peek // try to grab next message + } + } + private[akka] def registerSupervisorForActor(actorRef: ActorRef): ActorRef = if (!actorRef.supervisor.isDefined) throw new IllegalActorStateException( "Can't register supervisor for " + actorRef + " since it is not under supervision") @@ -265,6 +281,7 @@ class ActiveRemoteClient private[akka] ( module: NettyRemoteClientModule, remoteAddress: InetSocketAddress, val loader: Option[ClassLoader] = None, notifyListenersFun: (=> Any) => Unit) extends RemoteClient(module, remoteAddress) { import RemoteClientSettings._ + //FIXME rewrite to a wrapper object (minimize volatile access and maximize encapsulation) @volatile private var bootstrap: ClientBootstrap = _ @volatile private[remote] var connection: ChannelFuture = _ @@ -335,6 +352,7 @@ class ActiveRemoteClient private[akka] ( bootstrap.releaseExternalResources bootstrap = null connection = null + pendingMessages.clear } private[akka] def isWithinReconnectionTimeWindow: Boolean = { @@ -449,6 +467,7 @@ class ActiveRemoteClientHandler( } override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { + client.sendPendingMessages() // try to send pending message (still there after client/server crash ard reconnect client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress)) client.resetReconnectionTimeWindow } @@ -479,7 +498,7 @@ class ActiveRemoteClientHandler( } catch { case problem: Throwable => EventHandler.error(problem, this, problem.getMessage) - UnparsableException(classname, exception.getMessage) + UnknownRemoteException(problem, classname, exception.getMessage) } } }