diff --git a/akka-remote/src/main/scala/akka/remote/RemoteEventHandler.scala b/akka-remote/src/main/scala/akka/remote/RemoteEventHandler.scala new file mode 100644 index 0000000000..6d3535330d --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/RemoteEventHandler.scala @@ -0,0 +1,46 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +package akka.remote + +import akka.actor.Actor +import akka.event.EventHandler +import akka.remoteinterface._ + +/** + * Remote client and server event listener that pipes the events to the standard Akka EventHander. + * + * @author Jonas Bonér + */ +class RemoteEventHandler extends Actor { + import EventHandler._ + println("((((((((((((((( REMOTE EVENT") + + self.id = ID + self.dispatcher = EventHandlerDispatcher + + def receive = { + + // client + case RemoteClientError(cause, client, address) => EventHandler.error(cause, client, "RemoteClientError - Address[%s]" format address.toString) + case RemoteClientWriteFailed(request, cause, client, address) => EventHandler.error(cause, client, "RemoteClientWriteFailed - Request[%s] Address[%s]".format(address.toString)) + case RemoteClientDisconnected(client, address) => EventHandler.info(client, "RemoteClientDisconnected - Address[%s]" format address.toString) + case RemoteClientConnected(client, address) => EventHandler.info(client, "RemoteClientConnected - Address[%s]" format address.toString) + case RemoteClientStarted(client, address) => EventHandler.info(client, "RemoteClientStarted - Address[%s]" format address.toString) + case RemoteClientShutdown(client, address) => EventHandler.info(client, "RemoteClientShutdown - Address[%s]" format address.toString) + + // server + case RemoteServerError(cause, server) => EventHandler.error(cause, server, "RemoteServerError") + case RemoteServerWriteFailed(request, cause, server, clientAddress) => EventHandler.error(cause, server, "RemoteServerWriteFailed - Request[%s] Address[%s]" format (request, clientAddress.toString)) + case RemoteServerStarted(server) => EventHandler.info(server, "RemoteServerStarted") + case RemoteServerShutdown(server) => EventHandler.info(server, "RemoteServerShutdown") + case RemoteServerClientConnected(server, clientAddress) => EventHandler.info(server, "RemoteServerClientConnected - Address[%s]" format clientAddress.toString) + case RemoteServerClientDisconnected(server, clientAddress) => EventHandler.info(server, "RemoteServerClientDisconnected - Address[%s]" format clientAddress.toString) + case RemoteServerClientClosed(server, clientAddress) => EventHandler.info(server, "RemoteServerClientClosed - Address[%s]" format clientAddress.toString) + + case _ => //ignore other + } +} + + diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index b515b6706f..dabd0c5a3e 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -7,44 +7,36 @@ package akka.remote.netty import akka.dispatch.{DefaultCompletableFuture, CompletableFuture, Future} import akka.remote.protocol.RemoteProtocol._ import akka.remote.protocol.RemoteProtocol.ActorType._ -import akka.config.ConfigurationException import akka.serialization.RemoteActorSerialization import akka.serialization.RemoteActorSerialization._ -import akka.japi.Creator -import akka.config.Config._ import akka.remoteinterface._ -import akka.actor.{PoisonPill, Index, - ActorInitializationException, LocalActorRef, newUuid, - ActorRegistry, Actor, RemoteActorRef, +import akka.actor.{PoisonPill, Index, LocalActorRef, Actor, RemoteActorRef, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid, Exit, LifeCycleMessage, ActorType => AkkaActorType} -import akka.AkkaException -import akka.event.EventHandler import akka.actor.Actor._ import akka.util._ -import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings} +import akka.event.EventHandler import org.jboss.netty.channel._ import org.jboss.netty.channel.group.{DefaultChannelGroup,ChannelGroup,ChannelGroupFuture} import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory -import org.jboss.netty.bootstrap.{ServerBootstrap,ClientBootstrap} +import org.jboss.netty.bootstrap.{ServerBootstrap, ClientBootstrap} import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender } import org.jboss.netty.handler.codec.compression.{ ZlibDecoder, ZlibEncoder } import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder } import org.jboss.netty.handler.timeout.{ ReadTimeoutHandler, ReadTimeoutException } import org.jboss.netty.handler.execution.{ OrderedMemoryAwareThreadPoolExecutor, ExecutionHandler } import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer } -import org.jboss.netty.handler.ssl.SslHandler -import scala.collection.mutable.{ HashMap } -import scala.reflect.BeanProperty +import scala.collection.mutable.HashMap -import java.net.{ SocketAddress, InetSocketAddress } +import java.net.InetSocketAddress import java.lang.reflect.InvocationTargetException -import java.util.concurrent.{ TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap, ConcurrentSkipListSet } -import java.util.concurrent.atomic.{AtomicReference, AtomicLong, AtomicBoolean} +import java.util.concurrent.{ TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap } +import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean} +import akka.remote.{RemoteEventHandler, MessageSerializer, RemoteClientSettings, RemoteServerSettings} object RemoteEncoder { def encode(rmp: RemoteMessageProtocol): AkkaRemoteProtocol = { @@ -212,8 +204,9 @@ abstract class RemoteClient private[akka] ( * Sends the message across the wire */ def send[T]( - request: RemoteMessageProtocol, - senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = { + request: RemoteMessageProtocol, + senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = { + if (isRunning) { if (request.getOneWay) { val future = currentChannel.write(RemoteEncoder.encode(request)) @@ -222,7 +215,6 @@ abstract class RemoteClient private[akka] ( notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) throw future.getCause } - None } else { val futureResult = if (senderFuture.isDefined) senderFuture.get @@ -497,6 +489,9 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with //Needed for remote testing and switching on/off under run val optimizeLocal = new AtomicBoolean(true) + // add the remote client and server listener that pipes the events to the event handler system + addListener(Actor.actorOf[RemoteEventHandler].start) + def optimizeLocalScoped_?() = optimizeLocal.get protected[akka] def actorFor(serviceId: String, className: String, timeout: Long, host: String, port: Int, loader: Option[ClassLoader]): ActorRef = {