Added API to add listeners to subscribe to Error, Connect and Disconnect events on RemoteClient
This commit is contained in:
parent
784ca9e750
commit
e4e96f61c8
4 changed files with 47 additions and 18 deletions
|
|
@ -21,7 +21,7 @@ import org.jboss.netty.handler.timeout.ReadTimeoutHandler
|
|||
import org.jboss.netty.util.{TimerTask, Timeout, HashedWheelTimer}
|
||||
|
||||
import java.net.{SocketAddress, InetSocketAddress}
|
||||
import java.util.concurrent.{TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap}
|
||||
import java.util.concurrent.{TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap, ConcurrentSkipListSet}
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
|
||||
import scala.collection.mutable.{HashSet, HashMap}
|
||||
|
|
@ -36,6 +36,11 @@ object RemoteRequestIdFactory {
|
|||
def nextId: Long = id.getAndIncrement + nodeId
|
||||
}
|
||||
|
||||
sealed trait RemoteClientLifeCycleEvent
|
||||
case class RemoteClientError(cause: Throwable) extends RemoteClientLifeCycleEvent
|
||||
case class RemoteClientDisconnected(host: String, port: Int) extends RemoteClientLifeCycleEvent
|
||||
case class RemoteClientConnected(host: String, port: Int) extends RemoteClientLifeCycleEvent
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
|
|
@ -143,7 +148,7 @@ object RemoteClient extends Logging {
|
|||
actorsFor(RemoteServer.Address(hostname, port)) += uuid
|
||||
}
|
||||
|
||||
// TODO: add RemoteClient.unregister for ActiveObject, but first need a @shutdown callback
|
||||
// TODO: add RemoteClient.unregister for ActiveObject, but first need a @shutdown callback
|
||||
private[akka] def unregister(hostname: String, port: Int, uuid: String) = synchronized {
|
||||
val set = actorsFor(RemoteServer.Address(hostname, port))
|
||||
set -= uuid
|
||||
|
|
@ -164,12 +169,13 @@ object RemoteClient extends Logging {
|
|||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class RemoteClient(hostname: String, port: Int) extends Logging {
|
||||
class RemoteClient(val hostname: String, val port: Int) extends Logging {
|
||||
val name = "RemoteClient@" + hostname + "::" + port
|
||||
|
||||
@volatile private[remote] var isRunning = false
|
||||
private val futures = new ConcurrentHashMap[Long, CompletableFuture]
|
||||
private val supervisors = new ConcurrentHashMap[String, Actor]
|
||||
private[remote] val listeners = new ConcurrentSkipListSet[Actor]
|
||||
|
||||
private val channelFactory = new NioClientSocketChannelFactory(
|
||||
Executors.newCachedThreadPool,
|
||||
|
|
@ -193,7 +199,10 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
|
|||
// Wait until the connection attempt succeeds or fails.
|
||||
val channel = connection.awaitUninterruptibly.getChannel
|
||||
openChannels.add(channel)
|
||||
if (!connection.isSuccess) log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port)
|
||||
if (!connection.isSuccess) {
|
||||
listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientError(connection.getCause))
|
||||
log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port)
|
||||
}
|
||||
isRunning = true
|
||||
}
|
||||
}
|
||||
|
|
@ -221,7 +230,11 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
|
|||
Some(futureResult)
|
||||
}
|
||||
}
|
||||
} else throw new IllegalStateException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.")
|
||||
} else {
|
||||
val exception = new IllegalStateException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.")
|
||||
listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientError(exception))
|
||||
throw exception
|
||||
}
|
||||
|
||||
def registerSupervisorForActor(actor: Actor) =
|
||||
if (!actor._supervisor.isDefined) throw new IllegalStateException("Can't register supervisor for " + actor + " since it is not under supervision")
|
||||
|
|
@ -231,7 +244,9 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
|
|||
if (!actor._supervisor.isDefined) throw new IllegalStateException("Can't unregister supervisor for " + actor + " since it is not under supervision")
|
||||
else supervisors.remove(actor._supervisor.get.uuid)
|
||||
|
||||
def deregisterSupervisorWithUuid(uuid: String) = supervisors.remove(uuid)
|
||||
def registerListener(actor: Actor) = listeners.add(actor)
|
||||
|
||||
def deregisterListener(actor: Actor) = listeners.remove(actor)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -276,7 +291,6 @@ class RemoteClientHandler(val name: String,
|
|||
val timer: HashedWheelTimer,
|
||||
val client: RemoteClient)
|
||||
extends SimpleChannelUpstreamHandler with Logging {
|
||||
import Actor.Sender.Self
|
||||
|
||||
override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = {
|
||||
if (event.isInstanceOf[ChannelStateEvent] &&
|
||||
|
|
@ -299,17 +313,24 @@ class RemoteClientHandler(val name: String,
|
|||
} else {
|
||||
if (reply.hasSupervisorUuid()) {
|
||||
val supervisorUuid = reply.getSupervisorUuid
|
||||
if (!supervisors.containsKey(supervisorUuid)) throw new IllegalStateException("Expected a registered supervisor for UUID [" + supervisorUuid + "] but none was found")
|
||||
if (!supervisors.containsKey(supervisorUuid))
|
||||
throw new IllegalStateException("Expected a registered supervisor for UUID [" + supervisorUuid + "] but none was found")
|
||||
val supervisedActor = supervisors.get(supervisorUuid)
|
||||
if (!supervisedActor._supervisor.isDefined) throw new IllegalStateException("Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed")
|
||||
if (!supervisedActor._supervisor.isDefined)
|
||||
throw new IllegalStateException("Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed")
|
||||
else supervisedActor._supervisor.get ! Exit(supervisedActor, parseException(reply))
|
||||
}
|
||||
future.completeWithException(null, parseException(reply))
|
||||
}
|
||||
futures.remove(reply.getId)
|
||||
} else throw new IllegalArgumentException("Unknown message received in remote client handler: " + result)
|
||||
} else {
|
||||
val exception = new IllegalArgumentException("Unknown message received in remote client handler: " + result)
|
||||
client.listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientError(exception))
|
||||
throw exception
|
||||
}
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
client.listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientError(e))
|
||||
log.error("Unexpected exception in remote client handler: %s", e)
|
||||
throw e
|
||||
}
|
||||
|
|
@ -323,18 +344,26 @@ class RemoteClientHandler(val name: String,
|
|||
|
||||
// Wait until the connection attempt succeeds or fails.
|
||||
client.connection.awaitUninterruptibly
|
||||
if (!client.connection.isSuccess) log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress)
|
||||
if (!client.connection.isSuccess) {
|
||||
client.listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientError(client.connection.getCause))
|
||||
log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress)
|
||||
}
|
||||
}
|
||||
}, RemoteClient.RECONNECT_DELAY, TimeUnit.MILLISECONDS)
|
||||
}
|
||||
|
||||
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) =
|
||||
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
client.listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientConnected(client.hostname, client.port))
|
||||
log.debug("Remote client connected to [%s]", ctx.getChannel.getRemoteAddress)
|
||||
}
|
||||
|
||||
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) =
|
||||
log.debug("Remote client disconnected from [%s]", ctx.getChannel.getRemoteAddress);
|
||||
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
client.listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientDisconnected(client.hostname, client.port))
|
||||
log.debug("Remote client disconnected from [%s]", ctx.getChannel.getRemoteAddress)
|
||||
}
|
||||
|
||||
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
|
||||
client.listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientError(event.getCause))
|
||||
log.error(event.getCause, "Unexpected exception from downstream in remote client")
|
||||
event.getChannel.close
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue