Added missing events to RemoteServer Listener API

This commit is contained in:
Jonas Bonér 2010-08-21 16:37:33 +02:00
commit 70d61b118a
5 changed files with 44 additions and 78 deletions

View file

@ -1,30 +1,19 @@
/** /**
* Copyright (C) 2010, Progress Software Corporation and/or its * Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
* subsidiaries or affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ */
package se.scalablesolutions.akka.dispatch package se.scalablesolutions.akka.dispatch
import se.scalablesolutions.akka.actor.ActorRef import se.scalablesolutions.akka.actor.ActorRef
import org.fusesource.hawtdispatch.DispatchQueue import org.fusesource.hawtdispatch.DispatchQueue
import org.fusesource.hawtdispatch.ScalaDispatch._ import org.fusesource.hawtdispatch.ScalaDispatch._
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.CountDownLatch
import org.fusesource.hawtdispatch.DispatchQueue.QueueType import org.fusesource.hawtdispatch.DispatchQueue.QueueType
import org.fusesource.hawtdispatch.ListEventAggregator import org.fusesource.hawtdispatch.ListEventAggregator
import java.util.concurrent.atomic.{AtomicInteger, AtomicBoolean}
import java.util.concurrent.CountDownLatch
/** /**
* Holds helper methods for working with actors that are using * Holds helper methods for working with actors that are using
* a HawtDispatcher as it's dispatcher. * a HawtDispatcher as it's dispatcher.
@ -44,7 +33,6 @@ object HawtDispatcher {
} catch { } catch {
case _ => case _ =>
} }
println("done");
} }
}.start() }.start()
} }
@ -71,7 +59,6 @@ object HawtDispatcher {
mailbox(actorRef).queue mailbox(actorRef).queue
} }
/** /**
* <p> * <p>
* Pins an actor to a random thread queue. Once pinned the actor will always execute * Pins an actor to a random thread queue. Once pinned the actor will always execute
@ -108,7 +95,6 @@ object HawtDispatcher {
target(actorRef, globalQueue) target(actorRef, globalQueue)
} }
/** /**
* @return true if the actor was pinned to a thread. * @return true if the actor was pinned to a thread.
*/ */
@ -212,7 +198,6 @@ class HawtDispatcher(val aggregate:Boolean=true, val parent:DispatchQueue=global
} }
override def toString = "HawtDispatchEventDrivenDispatcher" override def toString = "HawtDispatchEventDrivenDispatcher"
} }
class HawtDispatcherMailbox(val queue:DispatchQueue) { class HawtDispatcherMailbox(val queue:DispatchQueue) {

View file

@ -7,6 +7,7 @@ package se.scalablesolutions.akka.remote
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._ import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._
import se.scalablesolutions.akka.actor.{Exit, Actor, ActorRef, RemoteActorRef, IllegalActorStateException} import se.scalablesolutions.akka.actor.{Exit, Actor, ActorRef, RemoteActorRef, IllegalActorStateException}
import se.scalablesolutions.akka.dispatch.{DefaultCompletableFuture, CompletableFuture} import se.scalablesolutions.akka.dispatch.{DefaultCompletableFuture, CompletableFuture}
import se.scalablesolutions.akka.util.{ListenerManagement, UUID, Logging, Duration}
import se.scalablesolutions.akka.config.Config._ import se.scalablesolutions.akka.config.Config._
import se.scalablesolutions.akka.AkkaException import se.scalablesolutions.akka.AkkaException
@ -27,7 +28,6 @@ import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable.{HashSet, HashMap} import scala.collection.mutable.{HashSet, HashMap}
import scala.reflect.BeanProperty import scala.reflect.BeanProperty
import se.scalablesolutions.akka.util.{ListenerManagement, UUID, Logging, Duration}
/** /**
* Atomic remote request/reply message id generator. * Atomic remote request/reply message id generator.

View file

@ -24,9 +24,8 @@ import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder}
import org.jboss.netty.handler.codec.compression.{ZlibEncoder, ZlibDecoder} import org.jboss.netty.handler.codec.compression.{ZlibEncoder, ZlibDecoder}
import org.jboss.netty.handler.ssl.SslHandler import org.jboss.netty.handler.ssl.SslHandler
import scala.collection.mutable.Map import scala.collection.mutable.Map
import reflect.BeanProperty import scala.reflect.BeanProperty
/** /**
* Use this object if you need a single remote server on a specific node. * Use this object if you need a single remote server on a specific node.
@ -165,23 +164,11 @@ object RemoteServer {
* Life-cycle events for RemoteServer. * Life-cycle events for RemoteServer.
*/ */
sealed trait RemoteServerLifeCycleEvent sealed trait RemoteServerLifeCycleEvent
case class RemoteServerError( case class RemoteServerError(@BeanProperty val cause: Throwable, @BeanProperty val server: RemoteServer) extends RemoteServerLifeCycleEvent
@BeanProperty val cause: Throwable, case class RemoteServerShutdown(@BeanProperty val server: RemoteServer) extends RemoteServerLifeCycleEvent
@BeanProperty val host: String, case class RemoteServerStarted(@BeanProperty val server: RemoteServer) extends RemoteServerLifeCycleEvent
@BeanProperty val port: Int) extends RemoteServerLifeCycleEvent case class RemoteServerClientConnected(@BeanProperty val server: RemoteServer) extends RemoteServerLifeCycleEvent
case class RemoteServerShutdown( case class RemoteServerClientDisconnected(@BeanProperty val server: RemoteServer) extends RemoteServerLifeCycleEvent
@BeanProperty val host: String,
@BeanProperty val port: Int) extends RemoteServerLifeCycleEvent
case class RemoteServerStarted(
@BeanProperty val host: String,
@BeanProperty val port: Int) extends RemoteServerLifeCycleEvent
/*FIXME NOT SUPPORTED YET
case class RemoteServerClientConnected(
@BeanProperty val host: String,
@BeanProperty val port: Int) extends RemoteServerLifeCycleEvent
case class RemoteServerClientDisconnected(
@BeanProperty val host: String,
@BeanProperty val port: Int) extends RemoteServerLifeCycleEvent*/
/** /**
* Use this class if you need a more than one remote server on a specific node. * Use this class if you need a more than one remote server on a specific node.
@ -254,12 +241,12 @@ class RemoteServer extends Logging with ListenerManagement {
openChannels.add(bootstrap.bind(new InetSocketAddress(hostname, port))) openChannels.add(bootstrap.bind(new InetSocketAddress(hostname, port)))
_isRunning = true _isRunning = true
Cluster.registerLocalNode(hostname, port) Cluster.registerLocalNode(hostname, port)
foreachListener(_ ! RemoteServerStarted(hostname,port)) foreachListener(_ ! RemoteServerStarted(this))
} }
} catch { } catch {
case e => case e =>
log.error(e, "Could not start up remote server") log.error(e, "Could not start up remote server")
foreachListener(_ ! RemoteServerError(e,hostname,port)) foreachListener(_ ! RemoteServerError(e, this))
} }
this this
} }
@ -272,7 +259,7 @@ class RemoteServer extends Logging with ListenerManagement {
openChannels.close.awaitUninterruptibly openChannels.close.awaitUninterruptibly
bootstrap.releaseExternalResources bootstrap.releaseExternalResources
Cluster.deregisterLocalNode(hostname, port) Cluster.deregisterLocalNode(hostname, port)
foreachListener(_ ! RemoteServerShutdown(hostname,port)) foreachListener(_ ! RemoteServerShutdown(this))
} catch { } catch {
case e: java.nio.channels.ClosedChannelException => {} case e: java.nio.channels.ClosedChannelException => {}
case e => log.warning("Could not close remote server channel in a graceful way") case e => log.warning("Could not close remote server channel in a graceful way")
@ -407,24 +394,30 @@ class RemoteServerHandler(
* ChannelOpen overridden to store open channels for a clean shutdown of a RemoteServer. * ChannelOpen overridden to store open channels for a clean shutdown of a RemoteServer.
* If a channel is closed before, it is automatically removed from the open channels group. * If a channel is closed before, it is automatically removed from the open channels group.
*/ */
override def channelOpen(ctx: ChannelHandlerContext, event: ChannelStateEvent) { override def channelOpen(ctx: ChannelHandlerContext, event: ChannelStateEvent) = openChannels.add(ctx.getChannel)
openChannels.add(ctx.getChannel)
}
override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent) { override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
log.debug("Remote client connected to [%s]", server.name)
if (RemoteServer.SECURE) { if (RemoteServer.SECURE) {
val sslHandler : SslHandler = ctx.getPipeline.get(classOf[SslHandler]) val sslHandler: SslHandler = ctx.getPipeline.get(classOf[SslHandler])
// Begin handshake. // Begin handshake.
sslHandler.handshake().addListener( new ChannelFutureListener { sslHandler.handshake().addListener(new ChannelFutureListener {
def operationComplete(future: ChannelFuture): Unit = { def operationComplete(future: ChannelFuture): Unit = {
if (future.isSuccess) openChannels.add(future.getChannel) if (future.isSuccess) {
else future.getChannel.close openChannels.add(future.getChannel)
server.foreachListener(_ ! RemoteServerClientConnected(server))
} else future.getChannel.close
} }
}) })
} }
} }
override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
log.debug("Remote client disconnected from [%s]", server.name)
server.foreachListener(_ ! RemoteServerClientDisconnected(server))
}
override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = { override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = {
if (event.isInstanceOf[ChannelStateEvent] && if (event.isInstanceOf[ChannelStateEvent] &&
event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) { event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) {
@ -444,7 +437,7 @@ class RemoteServerHandler(
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
log.error(event.getCause, "Unexpected exception from remote downstream") log.error(event.getCause, "Unexpected exception from remote downstream")
event.getChannel.close event.getChannel.close
server.foreachListener(_ ! RemoteServerError(event.getCause,server.hostname,server.port)) server.foreachListener(_ ! RemoteServerError(event.getCause, server))
} }
private def handleRemoteRequestProtocol(request: RemoteRequestProtocol, channel: Channel) = { private def handleRemoteRequestProtocol(request: RemoteRequestProtocol, channel: Channel) = {
@ -489,7 +482,7 @@ class RemoteServerHandler(
} catch { } catch {
case e: Throwable => case e: Throwable =>
channel.write(createErrorReplyMessage(e, request, true)) channel.write(createErrorReplyMessage(e, request, true))
server.foreachListener(_ ! RemoteServerError(e,server.hostname,server.port)) server.foreachListener(_ ! RemoteServerError(e, server))
} }
} }
} }
@ -521,10 +514,10 @@ class RemoteServerHandler(
} catch { } catch {
case e: InvocationTargetException => case e: InvocationTargetException =>
channel.write(createErrorReplyMessage(e.getCause, request, false)) channel.write(createErrorReplyMessage(e.getCause, request, false))
server.foreachListener(_ ! RemoteServerError(e,server.hostname,server.port)) server.foreachListener(_ ! RemoteServerError(e, server))
case e: Throwable => case e: Throwable =>
channel.write(createErrorReplyMessage(e, request, false)) channel.write(createErrorReplyMessage(e, request, false))
server.foreachListener(_ ! RemoteServerError(e,server.hostname,server.port)) server.foreachListener(_ ! RemoteServerError(e, server))
} }
} }
@ -556,7 +549,7 @@ class RemoteServerHandler(
} catch { } catch {
case e => case e =>
log.error(e, "Could not create remote actor instance") log.error(e, "Could not create remote actor instance")
server.foreachListener(_ ! RemoteServerError(e,server.hostname,server.port)) server.foreachListener(_ ! RemoteServerError(e, server))
throw e throw e
} }
} else actorRefOrNull } else actorRefOrNull
@ -586,7 +579,7 @@ class RemoteServerHandler(
} catch { } catch {
case e => case e =>
log.error(e, "Could not create remote typed actor instance") log.error(e, "Could not create remote typed actor instance")
server.foreachListener(_ ! RemoteServerError(e,server.hostname,server.port)) server.foreachListener(_ ! RemoteServerError(e, server))
throw e throw e
} }
} else typedActorOrNull } else typedActorOrNull

View file

@ -1,11 +1,13 @@
package se.scalablesolutions.akka.actor.dispatch package se.scalablesolutions.akka.actor.dispatch
import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.scalatest.junit.JUnitSuite import org.scalatest.junit.JUnitSuite
import org.junit.Test import org.junit.Test
import se.scalablesolutions.akka.dispatch.{HawtDispatcher, Dispatchers}
import se.scalablesolutions.akka.actor.Actor import se.scalablesolutions.akka.actor.Actor
import Actor._ import Actor._
import se.scalablesolutions.akka.dispatch.{HawtDispatcher, Dispatchers}
object HawtDispatcherActorSpec { object HawtDispatcherActorSpec {
class TestActor extends Actor { class TestActor extends Actor {

View file

@ -1,22 +1,11 @@
/** /**
* Copyright (C) 2010, Progress Software Corporation and/or its * Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
* subsidiaries or affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ */
package se.scalablesolutions.akka.actor.dispatch package se.scalablesolutions.akka.actor.dispatch
import collection.mutable.ListBuffer import scala.collection.mutable.ListBuffer
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.io.IOException import java.io.IOException
@ -26,13 +15,13 @@ import java.nio.channels.{SocketChannel, SelectionKey, ServerSocketChannel}
import se.scalablesolutions.akka.actor._ import se.scalablesolutions.akka.actor._
import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.dispatch.HawtDispatcher import se.scalablesolutions.akka.dispatch.HawtDispatcher
import org.fusesource.hawtdispatch.DispatchSource import org.fusesource.hawtdispatch.DispatchSource
import org.fusesource.hawtdispatch.ScalaDispatch._ import org.fusesource.hawtdispatch.ScalaDispatch._
/** /**
* This is an example of how to crate an Akka actor based TCP echo server using * This is an example of how to crate an Akka actor based TCP echo server using
* the HawtDispatch dispatcher and NIO event sources. * the HawtDispatch dispatcher and NIO event sources.
*
*/ */
object HawtDispatcherEchoServer { object HawtDispatcherEchoServer {
@ -203,7 +192,6 @@ object HawtDispatcherEchoServer {
} }
} }
def close() = { def close() = {
if( !closed ) { if( !closed ) {
closed = true closed = true
@ -215,7 +203,5 @@ object HawtDispatcherEchoServer {
case DisplayStats => case DisplayStats =>
println("connection to %s reads: %,d bytes, writes: %,d".format(remote_address, readCounter, writeCounter)) println("connection to %s reads: %,d bytes, writes: %,d".format(remote_address, readCounter, writeCounter))
} }
} }
} }