diff --git a/akka-core/src/main/scala/dispatch/HawtDispatcher.scala b/akka-core/src/main/scala/dispatch/HawtDispatcher.scala index 45e4468b3d..e0ddf05d26 100644 --- a/akka-core/src/main/scala/dispatch/HawtDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/HawtDispatcher.scala @@ -1,30 +1,19 @@ /** - * Copyright (C) 2010, Progress Software Corporation and/or its - * subsidiaries or affiliates. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Copyright (C) 2009-2010 Scalable Solutions AB */ + package se.scalablesolutions.akka.dispatch import se.scalablesolutions.akka.actor.ActorRef + import org.fusesource.hawtdispatch.DispatchQueue import org.fusesource.hawtdispatch.ScalaDispatch._ -import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.CountDownLatch import org.fusesource.hawtdispatch.DispatchQueue.QueueType import org.fusesource.hawtdispatch.ListEventAggregator +import java.util.concurrent.atomic.{AtomicInteger, AtomicBoolean} +import java.util.concurrent.CountDownLatch + /** * Holds helper methods for working with actors that are using * a HawtDispatcher as it's dispatcher. @@ -44,7 +33,6 @@ object HawtDispatcher { } catch { case _ => } - println("done"); } }.start() } @@ -71,7 +59,6 @@ object HawtDispatcher { mailbox(actorRef).queue } - /** *

* Pins an actor to a random thread queue. Once pinned the actor will always execute @@ -108,7 +95,6 @@ object HawtDispatcher { target(actorRef, globalQueue) } - /** * @return true if the actor was pinned to a thread. */ @@ -212,7 +198,6 @@ class HawtDispatcher(val aggregate:Boolean=true, val parent:DispatchQueue=global } override def toString = "HawtDispatchEventDrivenDispatcher" - } class HawtDispatcherMailbox(val queue:DispatchQueue) { diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala index 5d5233c2a7..d731a07d57 100644 --- a/akka-core/src/main/scala/remote/RemoteClient.scala +++ b/akka-core/src/main/scala/remote/RemoteClient.scala @@ -7,6 +7,7 @@ package se.scalablesolutions.akka.remote import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._ import se.scalablesolutions.akka.actor.{Exit, Actor, ActorRef, RemoteActorRef, IllegalActorStateException} import se.scalablesolutions.akka.dispatch.{DefaultCompletableFuture, CompletableFuture} +import se.scalablesolutions.akka.util.{ListenerManagement, UUID, Logging, Duration} import se.scalablesolutions.akka.config.Config._ import se.scalablesolutions.akka.AkkaException @@ -27,7 +28,6 @@ import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable.{HashSet, HashMap} import scala.reflect.BeanProperty -import se.scalablesolutions.akka.util.{ListenerManagement, UUID, Logging, Duration} /** * Atomic remote request/reply message id generator. diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index 95d016094d..c28c0b5a0d 100644 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -24,9 +24,8 @@ import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder} import org.jboss.netty.handler.codec.compression.{ZlibEncoder, ZlibDecoder} import org.jboss.netty.handler.ssl.SslHandler - import scala.collection.mutable.Map -import reflect.BeanProperty +import scala.reflect.BeanProperty /** * Use this object if you need a single remote server on a specific node. @@ -165,23 +164,11 @@ object RemoteServer { * Life-cycle events for RemoteServer. */ sealed trait RemoteServerLifeCycleEvent -case class RemoteServerError( - @BeanProperty val cause: Throwable, - @BeanProperty val host: String, - @BeanProperty val port: Int) extends RemoteServerLifeCycleEvent -case class RemoteServerShutdown( - @BeanProperty val host: String, - @BeanProperty val port: Int) extends RemoteServerLifeCycleEvent -case class RemoteServerStarted( - @BeanProperty val host: String, - @BeanProperty val port: Int) extends RemoteServerLifeCycleEvent -/*FIXME NOT SUPPORTED YET - case class RemoteServerClientConnected( - @BeanProperty val host: String, - @BeanProperty val port: Int) extends RemoteServerLifeCycleEvent -case class RemoteServerClientDisconnected( - @BeanProperty val host: String, - @BeanProperty val port: Int) extends RemoteServerLifeCycleEvent*/ +case class RemoteServerError(@BeanProperty val cause: Throwable, @BeanProperty val server: RemoteServer) extends RemoteServerLifeCycleEvent +case class RemoteServerShutdown(@BeanProperty val server: RemoteServer) extends RemoteServerLifeCycleEvent +case class RemoteServerStarted(@BeanProperty val server: RemoteServer) extends RemoteServerLifeCycleEvent +case class RemoteServerClientConnected(@BeanProperty val server: RemoteServer) extends RemoteServerLifeCycleEvent +case class RemoteServerClientDisconnected(@BeanProperty val server: RemoteServer) extends RemoteServerLifeCycleEvent /** * Use this class if you need a more than one remote server on a specific node. @@ -254,12 +241,12 @@ class RemoteServer extends Logging with ListenerManagement { openChannels.add(bootstrap.bind(new InetSocketAddress(hostname, port))) _isRunning = true Cluster.registerLocalNode(hostname, port) - foreachListener(_ ! RemoteServerStarted(hostname,port)) + foreachListener(_ ! RemoteServerStarted(this)) } } catch { case e => log.error(e, "Could not start up remote server") - foreachListener(_ ! RemoteServerError(e,hostname,port)) + foreachListener(_ ! RemoteServerError(e, this)) } this } @@ -272,7 +259,7 @@ class RemoteServer extends Logging with ListenerManagement { openChannels.close.awaitUninterruptibly bootstrap.releaseExternalResources Cluster.deregisterLocalNode(hostname, port) - foreachListener(_ ! RemoteServerShutdown(hostname,port)) + foreachListener(_ ! RemoteServerShutdown(this)) } catch { case e: java.nio.channels.ClosedChannelException => {} case e => log.warning("Could not close remote server channel in a graceful way") @@ -407,24 +394,30 @@ class RemoteServerHandler( * ChannelOpen overridden to store open channels for a clean shutdown of a RemoteServer. * If a channel is closed before, it is automatically removed from the open channels group. */ - override def channelOpen(ctx: ChannelHandlerContext, event: ChannelStateEvent) { - openChannels.add(ctx.getChannel) - } + override def channelOpen(ctx: ChannelHandlerContext, event: ChannelStateEvent) = openChannels.add(ctx.getChannel) - override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent) { + override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { + log.debug("Remote client connected to [%s]", server.name) if (RemoteServer.SECURE) { - val sslHandler : SslHandler = ctx.getPipeline.get(classOf[SslHandler]) + val sslHandler: SslHandler = ctx.getPipeline.get(classOf[SslHandler]) // Begin handshake. - sslHandler.handshake().addListener( new ChannelFutureListener { + sslHandler.handshake().addListener(new ChannelFutureListener { def operationComplete(future: ChannelFuture): Unit = { - if (future.isSuccess) openChannels.add(future.getChannel) - else future.getChannel.close + if (future.isSuccess) { + openChannels.add(future.getChannel) + server.foreachListener(_ ! RemoteServerClientConnected(server)) + } else future.getChannel.close } }) } } + override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { + log.debug("Remote client disconnected from [%s]", server.name) + server.foreachListener(_ ! RemoteServerClientDisconnected(server)) + } + override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = { if (event.isInstanceOf[ChannelStateEvent] && event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) { @@ -444,7 +437,7 @@ class RemoteServerHandler( override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { log.error(event.getCause, "Unexpected exception from remote downstream") event.getChannel.close - server.foreachListener(_ ! RemoteServerError(event.getCause,server.hostname,server.port)) + server.foreachListener(_ ! RemoteServerError(event.getCause, server)) } private def handleRemoteRequestProtocol(request: RemoteRequestProtocol, channel: Channel) = { @@ -489,7 +482,7 @@ class RemoteServerHandler( } catch { case e: Throwable => channel.write(createErrorReplyMessage(e, request, true)) - server.foreachListener(_ ! RemoteServerError(e,server.hostname,server.port)) + server.foreachListener(_ ! RemoteServerError(e, server)) } } } @@ -521,10 +514,10 @@ class RemoteServerHandler( } catch { case e: InvocationTargetException => channel.write(createErrorReplyMessage(e.getCause, request, false)) - server.foreachListener(_ ! RemoteServerError(e,server.hostname,server.port)) + server.foreachListener(_ ! RemoteServerError(e, server)) case e: Throwable => channel.write(createErrorReplyMessage(e, request, false)) - server.foreachListener(_ ! RemoteServerError(e,server.hostname,server.port)) + server.foreachListener(_ ! RemoteServerError(e, server)) } } @@ -556,7 +549,7 @@ class RemoteServerHandler( } catch { case e => log.error(e, "Could not create remote actor instance") - server.foreachListener(_ ! RemoteServerError(e,server.hostname,server.port)) + server.foreachListener(_ ! RemoteServerError(e, server)) throw e } } else actorRefOrNull @@ -586,7 +579,7 @@ class RemoteServerHandler( } catch { case e => log.error(e, "Could not create remote typed actor instance") - server.foreachListener(_ ! RemoteServerError(e,server.hostname,server.port)) + server.foreachListener(_ ! RemoteServerError(e, server)) throw e } } else typedActorOrNull diff --git a/akka-core/src/test/scala/dispatch/HawtDispatcherActorSpec.scala b/akka-core/src/test/scala/dispatch/HawtDispatcherActorSpec.scala index dcc8f7eafb..2c45f3388c 100644 --- a/akka-core/src/test/scala/dispatch/HawtDispatcherActorSpec.scala +++ b/akka-core/src/test/scala/dispatch/HawtDispatcherActorSpec.scala @@ -1,11 +1,13 @@ package se.scalablesolutions.akka.actor.dispatch import java.util.concurrent.{CountDownLatch, TimeUnit} + import org.scalatest.junit.JUnitSuite import org.junit.Test + +import se.scalablesolutions.akka.dispatch.{HawtDispatcher, Dispatchers} import se.scalablesolutions.akka.actor.Actor import Actor._ -import se.scalablesolutions.akka.dispatch.{HawtDispatcher, Dispatchers} object HawtDispatcherActorSpec { class TestActor extends Actor { diff --git a/akka-core/src/test/scala/dispatch/HawtDispatcherEchoServer.scala b/akka-core/src/test/scala/dispatch/HawtDispatcherEchoServer.scala index 208433bd4b..97f2e0df9d 100644 --- a/akka-core/src/test/scala/dispatch/HawtDispatcherEchoServer.scala +++ b/akka-core/src/test/scala/dispatch/HawtDispatcherEchoServer.scala @@ -1,22 +1,11 @@ /** - * Copyright (C) 2010, Progress Software Corporation and/or its - * subsidiaries or affiliates. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Copyright (C) 2009-2010 Scalable Solutions AB */ + package se.scalablesolutions.akka.actor.dispatch -import collection.mutable.ListBuffer +import scala.collection.mutable.ListBuffer + import java.util.concurrent.TimeUnit import java.net.InetSocketAddress import java.io.IOException @@ -26,13 +15,13 @@ import java.nio.channels.{SocketChannel, SelectionKey, ServerSocketChannel} import se.scalablesolutions.akka.actor._ import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.dispatch.HawtDispatcher + import org.fusesource.hawtdispatch.DispatchSource import org.fusesource.hawtdispatch.ScalaDispatch._ /** * This is an example of how to crate an Akka actor based TCP echo server using * the HawtDispatch dispatcher and NIO event sources. - * */ object HawtDispatcherEchoServer { @@ -203,7 +192,6 @@ object HawtDispatcherEchoServer { } } - def close() = { if( !closed ) { closed = true @@ -215,7 +203,5 @@ object HawtDispatcherEchoServer { case DisplayStats => println("connection to %s reads: %,d bytes, writes: %,d".format(remote_address, readCounter, writeCounter)) } - } - }