diff --git a/akka-actor/src/main/scala/akka/Implicits.scala b/akka-actor/src/main/scala/akka/Implicits.scala index 6370e1c2fd..0a781649eb 100644 --- a/akka-actor/src/main/scala/akka/Implicits.scala +++ b/akka-actor/src/main/scala/akka/Implicits.scala @@ -14,7 +14,10 @@ package object actor { ref.asInstanceOf[ActorRef] type Uuid = com.eaio.uuid.UUID + def newUuid(): Uuid = new Uuid() - def uuidFrom(time: Long, clockSeqAndNode: Long): Uuid = new Uuid(time,clockSeqAndNode) + + def uuidFrom(time: Long, clockSeqAndNode: Long): Uuid = new Uuid(time, clockSeqAndNode) + def uuidFrom(uuid: String): Uuid = new Uuid(uuid) } diff --git a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala index 3fd26bbfbb..b56ed67277 100644 --- a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala @@ -14,6 +14,7 @@ import scala.reflect.BeanProperty import java.net.InetSocketAddress import java.util.concurrent.ConcurrentHashMap +import java.io.{PrintWriter, PrintStream} trait RemoteModule { val UUID_PREFIX = "uuid:" @@ -113,16 +114,21 @@ case class RemoteServerWriteFailed( /** * Thrown for example when trying to send a message using a RemoteClient that is either not started or shut down. */ -class RemoteClientException private[akka] (message: String, - @BeanProperty val client: RemoteClientModule, - val remoteAddress: InetSocketAddress) extends AkkaException(message) +class RemoteClientException private[akka] ( + message: String, + @BeanProperty val client: RemoteClientModule, + val remoteAddress: InetSocketAddress) extends AkkaException(message) /** - * Returned when a remote exception cannot be instantiated or parsed + * Returned when a remote exception sent over the wire cannot be loaded and instantiated */ -case class UnparsableException private[akka] (originalClassName: String, - originalMessage: String) extends AkkaException(originalMessage) - +case class UnknownRemoteException private[akka] (cause: Throwable, originalClassName: String, originalMessage: String) + extends AkkaException("\nParsingError[%s]\nOriginalException[%s]\nOriginalMessage[%s]" + .format(cause.toString, originalClassName, originalMessage)) { + override def printStackTrace = cause.printStackTrace + override def printStackTrace(printStream: PrintStream) = cause.printStackTrace(printStream) + override def printStackTrace(printWriter: PrintWriter) = cause.printStackTrace(printWriter) +} abstract class RemoteSupport extends ListenerManagement with RemoteServerModule with RemoteClientModule { diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 2f4579cc36..ba83c58c0c 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -34,10 +34,10 @@ import scala.collection.mutable.HashMap import java.net.InetSocketAddress import java.lang.reflect.InvocationTargetException -import java.util.concurrent.{ TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap } import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean} import akka.remoteinterface.{RemoteEventHandler} import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings} +import java.util.concurrent._ object RemoteEncoder { def encode(rmp: RemoteMessageProtocol): AkkaRemoteProtocol = { @@ -157,19 +157,25 @@ abstract class RemoteClient private[akka] ( val module: NettyRemoteClientModule, val remoteAddress: InetSocketAddress) { - val name = this.getClass.getSimpleName + "@" + remoteAddress.getAddress.getHostAddress + "::" + remoteAddress.getPort + val name = this.getClass.getSimpleName + "@" + + remoteAddress.getAddress.getHostAddress + "::" + + remoteAddress.getPort - protected val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]] - protected val supervisors = new ConcurrentHashMap[Uuid, ActorRef] - private[remote] val runSwitch = new Switch() + protected val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]] + protected val supervisors = new ConcurrentHashMap[Uuid, ActorRef] + protected val pendingMessages = new ConcurrentLinkedQueue[(Boolean, Uuid, RemoteMessageProtocol)] + + private[remote] val runSwitch = new Switch() private[remote] val isAuthenticated = new AtomicBoolean(false) private[remote] def isRunning = runSwitch.isOn protected def notifyListeners(msg: => Any); Unit + protected def currentChannel: Channel def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean + def shutdown: Boolean /** @@ -207,36 +213,19 @@ abstract class RemoteClient private[akka] ( def send[T]( request: RemoteMessageProtocol, senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = { - if (isRunning) { if (request.getOneWay) { - txLog.add(request) - val future = currentChannel.write(RemoteEncoder.encode(request)) - future.awaitUninterruptibly() - if (!future.isCancelled && !future.isSuccess) { - notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) - throw future.getCause - } else + pendingMessages.add((true, null, request)) + sendPendingMessages() None } else { - val futureResult = if (senderFuture.isDefined) senderFuture.get - else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout) - val futureUuid = uuidFrom(request.getUuid.getHigh, request.getUuid.getLow) - futures.put(futureUuid, futureResult) //Add this prematurely, remove it if write fails - currentChannel.write(RemoteEncoder.encode(request)).addListener(new ChannelFutureListener { - def operationComplete(future: ChannelFuture) { - if (future.isCancelled) { - futures.remove(futureUuid) //Clean this up - //We don't care about that right now - } else if (!future.isSuccess) { - val f = futures.remove(futureUuid) //Clean this up - if (f ne null) - f.completeWithException(future.getCause) - notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) - } - } - }) - Some(futureResult) + val futureResult = if (senderFuture.isDefined) senderFuture.get + else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout) + val futureUuid = uuidFrom(request.getUuid.getHigh, request.getUuid.getLow) + futures.put(futureUuid, futureResult) // Add future prematurely, remove it if write fails + pendingMessages.add((false, futureUuid, request)) + sendPendingMessages() + Some(futureResult) } } else { val exception = new RemoteClientException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.", module, remoteAddress) @@ -245,6 +234,33 @@ abstract class RemoteClient private[akka] ( } } + private[remote] def sendPendingMessages() = { + var pendingMessage = pendingMessages.peek // try to grab first message + while (pendingMessage ne null) { + val (isOneWay, futureUuid, message) = pendingMessage + if (isOneWay) { // sendOneWay + val future = currentChannel.write(RemoteEncoder.encode(message)) + future.awaitUninterruptibly() + if (!future.isCancelled && !future.isSuccess) { + notifyListeners(RemoteClientWriteFailed(message, future.getCause, module, remoteAddress)) + throw future.getCause + } + } else { // sendRequestReply + val future = currentChannel.write(RemoteEncoder.encode(message)) + future.awaitUninterruptibly() + if (future.isCancelled) { + futures.remove(futureUuid) // Clean up future + } else if (!future.isSuccess) { + val f = futures.remove(futureUuid) // Clean up future + if (f ne null) f.completeWithException(future.getCause) + notifyListeners(RemoteClientWriteFailed(message, future.getCause, module, remoteAddress)) + } + } + pendingMessages.remove(pendingMessage) // message delivered; remove from tx log + pendingMessage = pendingMessages.peek // try to grab next message + } + } + private[akka] def registerSupervisorForActor(actorRef: ActorRef): ActorRef = if (!actorRef.supervisor.isDefined) throw new IllegalActorStateException( "Can't register supervisor for " + actorRef + " since it is not under supervision") @@ -265,6 +281,7 @@ class ActiveRemoteClient private[akka] ( module: NettyRemoteClientModule, remoteAddress: InetSocketAddress, val loader: Option[ClassLoader] = None, notifyListenersFun: (=> Any) => Unit) extends RemoteClient(module, remoteAddress) { import RemoteClientSettings._ + //FIXME rewrite to a wrapper object (minimize volatile access and maximize encapsulation) @volatile private var bootstrap: ClientBootstrap = _ @volatile private[remote] var connection: ChannelFuture = _ @@ -335,6 +352,7 @@ class ActiveRemoteClient private[akka] ( bootstrap.releaseExternalResources bootstrap = null connection = null + pendingMessages.clear } private[akka] def isWithinReconnectionTimeWindow: Boolean = { @@ -449,6 +467,7 @@ class ActiveRemoteClientHandler( } override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { + client.sendPendingMessages() // try to send pending message (still there after client/server crash ard reconnect client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress)) client.resetReconnectionTimeWindow } @@ -479,7 +498,7 @@ class ActiveRemoteClientHandler( } catch { case problem: Throwable => EventHandler.error(problem, this, problem.getMessage) - UnparsableException(classname, exception.getMessage) + UnknownRemoteException(problem, classname, exception.getMessage) } } }