Added a remote event handler that pipes remote server and client events to the standard EventHandler system

This commit is contained in:
Jonas Bonér 2011-03-24 12:28:01 +01:00
parent 03ad1acdd9
commit cb0f14a2c0
2 changed files with 60 additions and 19 deletions

View file

@ -0,0 +1,46 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.remote
import akka.actor.Actor
import akka.event.EventHandler
import akka.remoteinterface._
/**
* Remote client and server event listener that pipes the events to the standard Akka EventHander.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class RemoteEventHandler extends Actor {
import EventHandler._
println("((((((((((((((( REMOTE EVENT")
self.id = ID
self.dispatcher = EventHandlerDispatcher
def receive = {
// client
case RemoteClientError(cause, client, address) => EventHandler.error(cause, client, "RemoteClientError - Address[%s]" format address.toString)
case RemoteClientWriteFailed(request, cause, client, address) => EventHandler.error(cause, client, "RemoteClientWriteFailed - Request[%s] Address[%s]".format(address.toString))
case RemoteClientDisconnected(client, address) => EventHandler.info(client, "RemoteClientDisconnected - Address[%s]" format address.toString)
case RemoteClientConnected(client, address) => EventHandler.info(client, "RemoteClientConnected - Address[%s]" format address.toString)
case RemoteClientStarted(client, address) => EventHandler.info(client, "RemoteClientStarted - Address[%s]" format address.toString)
case RemoteClientShutdown(client, address) => EventHandler.info(client, "RemoteClientShutdown - Address[%s]" format address.toString)
// server
case RemoteServerError(cause, server) => EventHandler.error(cause, server, "RemoteServerError")
case RemoteServerWriteFailed(request, cause, server, clientAddress) => EventHandler.error(cause, server, "RemoteServerWriteFailed - Request[%s] Address[%s]" format (request, clientAddress.toString))
case RemoteServerStarted(server) => EventHandler.info(server, "RemoteServerStarted")
case RemoteServerShutdown(server) => EventHandler.info(server, "RemoteServerShutdown")
case RemoteServerClientConnected(server, clientAddress) => EventHandler.info(server, "RemoteServerClientConnected - Address[%s]" format clientAddress.toString)
case RemoteServerClientDisconnected(server, clientAddress) => EventHandler.info(server, "RemoteServerClientDisconnected - Address[%s]" format clientAddress.toString)
case RemoteServerClientClosed(server, clientAddress) => EventHandler.info(server, "RemoteServerClientClosed - Address[%s]" format clientAddress.toString)
case _ => //ignore other
}
}

View file

@ -7,23 +7,16 @@ package akka.remote.netty
import akka.dispatch.{DefaultCompletableFuture, CompletableFuture, Future} import akka.dispatch.{DefaultCompletableFuture, CompletableFuture, Future}
import akka.remote.protocol.RemoteProtocol._ import akka.remote.protocol.RemoteProtocol._
import akka.remote.protocol.RemoteProtocol.ActorType._ import akka.remote.protocol.RemoteProtocol.ActorType._
import akka.config.ConfigurationException
import akka.serialization.RemoteActorSerialization import akka.serialization.RemoteActorSerialization
import akka.serialization.RemoteActorSerialization._ import akka.serialization.RemoteActorSerialization._
import akka.japi.Creator
import akka.config.Config._
import akka.remoteinterface._ import akka.remoteinterface._
import akka.actor.{PoisonPill, Index, import akka.actor.{PoisonPill, Index, LocalActorRef, Actor, RemoteActorRef,
ActorInitializationException, LocalActorRef, newUuid,
ActorRegistry, Actor, RemoteActorRef,
TypedActor, ActorRef, IllegalActorStateException, TypedActor, ActorRef, IllegalActorStateException,
RemoteActorSystemMessage, uuidFrom, Uuid, RemoteActorSystemMessage, uuidFrom, Uuid,
Exit, LifeCycleMessage, ActorType => AkkaActorType} Exit, LifeCycleMessage, ActorType => AkkaActorType}
import akka.AkkaException
import akka.event.EventHandler
import akka.actor.Actor._ import akka.actor.Actor._
import akka.util._ import akka.util._
import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings} import akka.event.EventHandler
import org.jboss.netty.channel._ import org.jboss.netty.channel._
import org.jboss.netty.channel.group.{DefaultChannelGroup,ChannelGroup,ChannelGroupFuture} import org.jboss.netty.channel.group.{DefaultChannelGroup,ChannelGroup,ChannelGroupFuture}
@ -36,15 +29,14 @@ import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder
import org.jboss.netty.handler.timeout.{ ReadTimeoutHandler, ReadTimeoutException } import org.jboss.netty.handler.timeout.{ ReadTimeoutHandler, ReadTimeoutException }
import org.jboss.netty.handler.execution.{ OrderedMemoryAwareThreadPoolExecutor, ExecutionHandler } import org.jboss.netty.handler.execution.{ OrderedMemoryAwareThreadPoolExecutor, ExecutionHandler }
import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer } import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer }
import org.jboss.netty.handler.ssl.SslHandler
import scala.collection.mutable.{ HashMap } import scala.collection.mutable.HashMap
import scala.reflect.BeanProperty
import java.net.{ SocketAddress, InetSocketAddress } import java.net.InetSocketAddress
import java.lang.reflect.InvocationTargetException import java.lang.reflect.InvocationTargetException
import java.util.concurrent.{ TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap, ConcurrentSkipListSet } import java.util.concurrent.{ TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap }
import java.util.concurrent.atomic.{AtomicReference, AtomicLong, AtomicBoolean} import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean}
import akka.remote.{RemoteEventHandler, MessageSerializer, RemoteClientSettings, RemoteServerSettings}
object RemoteEncoder { object RemoteEncoder {
def encode(rmp: RemoteMessageProtocol): AkkaRemoteProtocol = { def encode(rmp: RemoteMessageProtocol): AkkaRemoteProtocol = {
@ -214,6 +206,7 @@ abstract class RemoteClient private[akka] (
def send[T]( def send[T](
request: RemoteMessageProtocol, request: RemoteMessageProtocol,
senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = { senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = {
if (isRunning) { if (isRunning) {
if (request.getOneWay) { if (request.getOneWay) {
val future = currentChannel.write(RemoteEncoder.encode(request)) val future = currentChannel.write(RemoteEncoder.encode(request))
@ -222,7 +215,6 @@ abstract class RemoteClient private[akka] (
notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress))
throw future.getCause throw future.getCause
} }
None None
} else { } else {
val futureResult = if (senderFuture.isDefined) senderFuture.get val futureResult = if (senderFuture.isDefined) senderFuture.get
@ -497,6 +489,9 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with
//Needed for remote testing and switching on/off under run //Needed for remote testing and switching on/off under run
val optimizeLocal = new AtomicBoolean(true) val optimizeLocal = new AtomicBoolean(true)
// add the remote client and server listener that pipes the events to the event handler system
addListener(Actor.actorOf[RemoteEventHandler].start)
def optimizeLocalScoped_?() = optimizeLocal.get def optimizeLocalScoped_?() = optimizeLocal.get
protected[akka] def actorFor(serviceId: String, className: String, timeout: Long, host: String, port: Int, loader: Option[ClassLoader]): ActorRef = { protected[akka] def actorFor(serviceId: String, className: String, timeout: Long, host: String, port: Int, loader: Option[ClassLoader]): ActorRef = {