refactored remote event handler and added deregistration of it on remote shutdown

This commit is contained in:
Jonas Bonér 2011-03-24 12:48:40 +01:00
parent cb0f14a2c0
commit 6449fa940f
3 changed files with 22 additions and 17 deletions

View file

@ -1,12 +1,11 @@
package akka.remoteinterface
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.remote
import akka.actor.Actor
import akka.event.EventHandler
import akka.remoteinterface._
/**
* Remote client and server event listener that pipes the events to the standard Akka EventHander.
@ -15,7 +14,6 @@ import akka.remoteinterface._
*/
class RemoteEventHandler extends Actor {
import EventHandler._
println("((((((((((((((( REMOTE EVENT")
self.id = ID
self.dispatcher = EventHandlerDispatcher

View file

@ -5,14 +5,15 @@
package akka.remoteinterface
import akka.japi.Creator
import java.net.InetSocketAddress
import akka.actor._
import akka.util._
import akka.dispatch.CompletableFuture
import akka.config.Config.{config, TIME_UNIT}
import java.util.concurrent.ConcurrentHashMap
import akka.AkkaException
import reflect.BeanProperty
import scala.reflect.BeanProperty
import java.net.InetSocketAddress
import java.util.concurrent.ConcurrentHashMap
trait RemoteModule {
val UUID_PREFIX = "uuid:"
@ -20,7 +21,6 @@ trait RemoteModule {
def optimizeLocalScoped_?(): Boolean //Apply optimizations for remote operations in local scope
protected[akka] def notifyListeners(message: => Any): Unit
private[akka] def actors: ConcurrentHashMap[String, ActorRef]
private[akka] def actorsByUuid: ConcurrentHashMap[String, ActorRef]
private[akka] def actorsFactories: ConcurrentHashMap[String, () => ActorRef]
@ -28,7 +28,6 @@ trait RemoteModule {
private[akka] def typedActorsByUuid: ConcurrentHashMap[String, AnyRef]
private[akka] def typedActorsFactories: ConcurrentHashMap[String, () => AnyRef]
/** Lookup methods **/
private[akka] def findActorById(id: String) : ActorRef = actors.get(id)
@ -126,13 +125,21 @@ case class UnparsableException private[akka] (originalClassName: String,
abstract class RemoteSupport extends ListenerManagement with RemoteServerModule with RemoteClientModule {
lazy val eventHandler: ActorRef = {
val handler = Actor.actorOf[RemoteEventHandler].start
// add the remote client and server listener that pipes the events to the event handler system
addListener(handler)
handler
}
def shutdown {
removeListener(eventHandler)
this.shutdownClientModule
this.shutdownServerModule
clear
}
/**
* Creates a Client-managed ActorRef out of the Actor of the specified Class.
* If the supplied host and port is identical of the configured local node, it will be a local actor

View file

@ -36,7 +36,8 @@ import java.net.InetSocketAddress
import java.lang.reflect.InvocationTargetException
import java.util.concurrent.{ TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap }
import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean}
import akka.remote.{RemoteEventHandler, MessageSerializer, RemoteClientSettings, RemoteServerSettings}
import akka.remoteinterface.{RemoteEventHandler}
import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings}
object RemoteEncoder {
def encode(rmp: RemoteMessageProtocol): AkkaRemoteProtocol = {
@ -209,12 +210,13 @@ abstract class RemoteClient private[akka] (
if (isRunning) {
if (request.getOneWay) {
txLog.add(request)
val future = currentChannel.write(RemoteEncoder.encode(request))
future.awaitUninterruptibly()
if (!future.isCancelled && !future.isSuccess) {
notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress))
throw future.getCause
}
} else
None
} else {
val futureResult = if (senderFuture.isDefined) senderFuture.get
@ -486,11 +488,9 @@ class ActiveRemoteClientHandler(
* Provides the implementation of the Netty remote support
*/
class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with NettyRemoteClientModule {
//Needed for remote testing and switching on/off under run
val optimizeLocal = new AtomicBoolean(true)
// add the remote client and server listener that pipes the events to the event handler system
addListener(Actor.actorOf[RemoteEventHandler].start)
// Needed for remote testing and switching on/off under run
val optimizeLocal = new AtomicBoolean(true)
def optimizeLocalScoped_?() = optimizeLocal.get