From 6449fa940f64e523e9a3a2919a38d70164724651 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Thu, 24 Mar 2011 12:48:40 +0100 Subject: [PATCH] refactored remote event handler and added deregistration of it on remote shutdown --- .../remoteinterface}/RemoteEventHandler.scala | 6 ++---- .../remoteinterface/RemoteInterface.scala | 21 ++++++++++++------- .../remote/netty/NettyRemoteSupport.scala | 12 +++++------ 3 files changed, 22 insertions(+), 17 deletions(-) rename {akka-remote/src/main/scala/akka/remote => akka-actor/src/main/scala/akka/remoteinterface}/RemoteEventHandler.scala (96%) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteEventHandler.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteEventHandler.scala similarity index 96% rename from akka-remote/src/main/scala/akka/remote/RemoteEventHandler.scala rename to akka-actor/src/main/scala/akka/remoteinterface/RemoteEventHandler.scala index 6d3535330d..c3ad4d9c79 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteEventHandler.scala +++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteEventHandler.scala @@ -1,12 +1,11 @@ +package akka.remoteinterface + /** * Copyright (C) 2009-2011 Scalable Solutions AB */ -package akka.remote - import akka.actor.Actor import akka.event.EventHandler -import akka.remoteinterface._ /** * Remote client and server event listener that pipes the events to the standard Akka EventHander. @@ -15,7 +14,6 @@ import akka.remoteinterface._ */ class RemoteEventHandler extends Actor { import EventHandler._ - println("((((((((((((((( REMOTE EVENT") self.id = ID self.dispatcher = EventHandlerDispatcher diff --git a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala index 62dcc422ee..3fd26bbfbb 100644 --- a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala @@ -5,14 +5,15 @@ package akka.remoteinterface import akka.japi.Creator -import java.net.InetSocketAddress import akka.actor._ import akka.util._ import akka.dispatch.CompletableFuture -import akka.config.Config.{config, TIME_UNIT} -import java.util.concurrent.ConcurrentHashMap import akka.AkkaException -import reflect.BeanProperty + +import scala.reflect.BeanProperty + +import java.net.InetSocketAddress +import java.util.concurrent.ConcurrentHashMap trait RemoteModule { val UUID_PREFIX = "uuid:" @@ -20,7 +21,6 @@ trait RemoteModule { def optimizeLocalScoped_?(): Boolean //Apply optimizations for remote operations in local scope protected[akka] def notifyListeners(message: => Any): Unit - private[akka] def actors: ConcurrentHashMap[String, ActorRef] private[akka] def actorsByUuid: ConcurrentHashMap[String, ActorRef] private[akka] def actorsFactories: ConcurrentHashMap[String, () => ActorRef] @@ -28,7 +28,6 @@ trait RemoteModule { private[akka] def typedActorsByUuid: ConcurrentHashMap[String, AnyRef] private[akka] def typedActorsFactories: ConcurrentHashMap[String, () => AnyRef] - /** Lookup methods **/ private[akka] def findActorById(id: String) : ActorRef = actors.get(id) @@ -126,13 +125,21 @@ case class UnparsableException private[akka] (originalClassName: String, abstract class RemoteSupport extends ListenerManagement with RemoteServerModule with RemoteClientModule { + + lazy val eventHandler: ActorRef = { + val handler = Actor.actorOf[RemoteEventHandler].start + // add the remote client and server listener that pipes the events to the event handler system + addListener(handler) + handler + } + def shutdown { + removeListener(eventHandler) this.shutdownClientModule this.shutdownServerModule clear } - /** * Creates a Client-managed ActorRef out of the Actor of the specified Class. * If the supplied host and port is identical of the configured local node, it will be a local actor diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index dabd0c5a3e..2f4579cc36 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -36,7 +36,8 @@ import java.net.InetSocketAddress import java.lang.reflect.InvocationTargetException import java.util.concurrent.{ TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap } import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean} -import akka.remote.{RemoteEventHandler, MessageSerializer, RemoteClientSettings, RemoteServerSettings} +import akka.remoteinterface.{RemoteEventHandler} +import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings} object RemoteEncoder { def encode(rmp: RemoteMessageProtocol): AkkaRemoteProtocol = { @@ -209,12 +210,13 @@ abstract class RemoteClient private[akka] ( if (isRunning) { if (request.getOneWay) { + txLog.add(request) val future = currentChannel.write(RemoteEncoder.encode(request)) future.awaitUninterruptibly() if (!future.isCancelled && !future.isSuccess) { notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) throw future.getCause - } + } else None } else { val futureResult = if (senderFuture.isDefined) senderFuture.get @@ -486,11 +488,9 @@ class ActiveRemoteClientHandler( * Provides the implementation of the Netty remote support */ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with NettyRemoteClientModule { - //Needed for remote testing and switching on/off under run - val optimizeLocal = new AtomicBoolean(true) - // add the remote client and server listener that pipes the events to the event handler system - addListener(Actor.actorOf[RemoteEventHandler].start) + // Needed for remote testing and switching on/off under run + val optimizeLocal = new AtomicBoolean(true) def optimizeLocalScoped_?() = optimizeLocal.get