diff --git a/akka-core/src/main/scala/config/Config.scala b/akka-core/src/main/scala/config/Config.scala index 25a5d11207..68cd75d825 100644 --- a/akka-core/src/main/scala/config/Config.scala +++ b/akka-core/src/main/scala/config/Config.scala @@ -12,7 +12,7 @@ import net.lag.configgy.{Configgy, ParseException} * @author Jonas Bonér */ object Config extends Logging { - val VERSION = "0.8" + val VERSION = "0.8.1" // Set Multiverse options for max speed System.setProperty("org.multiverse.MuliverseConstants.sanityChecks", "false") @@ -53,7 +53,7 @@ object Config extends Logging { log.info("Config loaded from the application classpath.") } catch { case e: ParseException => throw new IllegalStateException( - "\nCan't find 'akka.conf' configuration file." + + "\nCan't find 'akka.conf' configuration file." + "\nOne of the three ways of locating the 'akka.conf' file needs to be defined:" + "\n\t1. Define 'AKKA_HOME' environment variable to the root of the Akka distribution." + "\n\t2. Define the '-Dakka.config=...' system property option." + diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala index 44ca58a051..72a7f37229 100644 --- a/akka-core/src/main/scala/remote/RemoteClient.scala +++ b/akka-core/src/main/scala/remote/RemoteClient.scala @@ -21,7 +21,7 @@ import org.jboss.netty.handler.timeout.ReadTimeoutHandler import org.jboss.netty.util.{TimerTask, Timeout, HashedWheelTimer} import java.net.{SocketAddress, InetSocketAddress} -import java.util.concurrent.{TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap} +import java.util.concurrent.{TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap, ConcurrentSkipListSet} import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable.{HashSet, HashMap} @@ -36,6 +36,11 @@ object RemoteRequestIdFactory { def nextId: Long = id.getAndIncrement + nodeId } +sealed trait RemoteClientLifeCycleEvent +case class RemoteClientError(cause: Throwable) extends RemoteClientLifeCycleEvent +case class RemoteClientDisconnected(host: String, port: Int) extends RemoteClientLifeCycleEvent +case class RemoteClientConnected(host: String, port: Int) extends RemoteClientLifeCycleEvent + /** * @author Jonas Bonér */ @@ -143,7 +148,7 @@ object RemoteClient extends Logging { actorsFor(RemoteServer.Address(hostname, port)) += uuid } - // TODO: add RemoteClient.unregister for ActiveObject, but first need a @shutdown callback + // TODO: add RemoteClient.unregister for ActiveObject, but first need a @shutdown callback private[akka] def unregister(hostname: String, port: Int, uuid: String) = synchronized { val set = actorsFor(RemoteServer.Address(hostname, port)) set -= uuid @@ -164,12 +169,13 @@ object RemoteClient extends Logging { /** * @author Jonas Bonér */ -class RemoteClient(hostname: String, port: Int) extends Logging { +class RemoteClient(val hostname: String, val port: Int) extends Logging { val name = "RemoteClient@" + hostname + "::" + port @volatile private[remote] var isRunning = false private val futures = new ConcurrentHashMap[Long, CompletableFuture] private val supervisors = new ConcurrentHashMap[String, Actor] + private[remote] val listeners = new ConcurrentSkipListSet[Actor] private val channelFactory = new NioClientSocketChannelFactory( Executors.newCachedThreadPool, @@ -193,7 +199,10 @@ class RemoteClient(hostname: String, port: Int) extends Logging { // Wait until the connection attempt succeeds or fails. val channel = connection.awaitUninterruptibly.getChannel openChannels.add(channel) - if (!connection.isSuccess) log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port) + if (!connection.isSuccess) { + listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientError(connection.getCause)) + log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port) + } isRunning = true } } @@ -221,7 +230,11 @@ class RemoteClient(hostname: String, port: Int) extends Logging { Some(futureResult) } } - } else throw new IllegalStateException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.") + } else { + val exception = new IllegalStateException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.") + listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientError(exception)) + throw exception + } def registerSupervisorForActor(actor: Actor) = if (!actor._supervisor.isDefined) throw new IllegalStateException("Can't register supervisor for " + actor + " since it is not under supervision") @@ -231,7 +244,9 @@ class RemoteClient(hostname: String, port: Int) extends Logging { if (!actor._supervisor.isDefined) throw new IllegalStateException("Can't unregister supervisor for " + actor + " since it is not under supervision") else supervisors.remove(actor._supervisor.get.uuid) - def deregisterSupervisorWithUuid(uuid: String) = supervisors.remove(uuid) + def registerListener(actor: Actor) = listeners.add(actor) + + def deregisterListener(actor: Actor) = listeners.remove(actor) } /** @@ -276,7 +291,6 @@ class RemoteClientHandler(val name: String, val timer: HashedWheelTimer, val client: RemoteClient) extends SimpleChannelUpstreamHandler with Logging { - import Actor.Sender.Self override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = { if (event.isInstanceOf[ChannelStateEvent] && @@ -299,17 +313,24 @@ class RemoteClientHandler(val name: String, } else { if (reply.hasSupervisorUuid()) { val supervisorUuid = reply.getSupervisorUuid - if (!supervisors.containsKey(supervisorUuid)) throw new IllegalStateException("Expected a registered supervisor for UUID [" + supervisorUuid + "] but none was found") + if (!supervisors.containsKey(supervisorUuid)) + throw new IllegalStateException("Expected a registered supervisor for UUID [" + supervisorUuid + "] but none was found") val supervisedActor = supervisors.get(supervisorUuid) - if (!supervisedActor._supervisor.isDefined) throw new IllegalStateException("Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed") + if (!supervisedActor._supervisor.isDefined) + throw new IllegalStateException("Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed") else supervisedActor._supervisor.get ! Exit(supervisedActor, parseException(reply)) } future.completeWithException(null, parseException(reply)) } futures.remove(reply.getId) - } else throw new IllegalArgumentException("Unknown message received in remote client handler: " + result) + } else { + val exception = new IllegalArgumentException("Unknown message received in remote client handler: " + result) + client.listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientError(exception)) + throw exception + } } catch { case e: Exception => + client.listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientError(e)) log.error("Unexpected exception in remote client handler: %s", e) throw e } @@ -323,18 +344,26 @@ class RemoteClientHandler(val name: String, // Wait until the connection attempt succeeds or fails. client.connection.awaitUninterruptibly - if (!client.connection.isSuccess) log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress) + if (!client.connection.isSuccess) { + client.listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientError(client.connection.getCause)) + log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress) + } } }, RemoteClient.RECONNECT_DELAY, TimeUnit.MILLISECONDS) } - override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = + override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { + client.listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientConnected(client.hostname, client.port)) log.debug("Remote client connected to [%s]", ctx.getChannel.getRemoteAddress) + } - override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = - log.debug("Remote client disconnected from [%s]", ctx.getChannel.getRemoteAddress); + override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { + client.listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientDisconnected(client.hostname, client.port)) + log.debug("Remote client disconnected from [%s]", ctx.getChannel.getRemoteAddress) + } override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { + client.listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientError(event.getCause)) log.error(event.getCause, "Unexpected exception from downstream in remote client") event.getChannel.close } diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 25981591af..34cc9eaec8 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -15,7 +15,7 @@ - version = "0.8" + version = "0.8.1" # FQN to the class doing initial active object/actor # supervisor bootstrap, should be defined in default constructor diff --git a/project/build.properties b/project/build.properties index 331a7e755a..c77a586950 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1,6 +1,6 @@ project.organization=se.scalablesolutions.akka project.name=akka -project.version=0.8 +project.version=0.8.1 scala.version=2.8.0.Beta1 sbt.version=0.7.1 def.scala.version=2.7.7