2009-06-24 15:12:47 +02:00
|
|
|
/**
|
2009-12-27 16:01:53 +01:00
|
|
|
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
2009-06-24 15:12:47 +02:00
|
|
|
*/
|
|
|
|
|
|
2009-12-16 23:20:15 +01:00
|
|
|
package se.scalablesolutions.akka.remote
|
2009-06-24 15:12:47 +02:00
|
|
|
|
2009-12-16 23:20:15 +01:00
|
|
|
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequest, RemoteReply}
|
2009-10-22 11:14:36 +02:00
|
|
|
import se.scalablesolutions.akka.actor.{Exit, Actor}
|
2010-03-01 22:03:17 +01:00
|
|
|
import se.scalablesolutions.akka.dispatch.{DefaultCompletableFuture, CompletableFuture}
|
2009-12-30 09:24:10 +01:00
|
|
|
import se.scalablesolutions.akka.util.{UUID, Logging}
|
2010-03-10 22:38:52 +01:00
|
|
|
import se.scalablesolutions.akka.config.Config.config
|
2009-07-18 00:16:32 +02:00
|
|
|
|
2009-07-01 15:29:06 +02:00
|
|
|
import org.jboss.netty.channel._
|
2010-02-15 16:21:26 +01:00
|
|
|
import group.DefaultChannelGroup
|
2009-07-01 15:29:06 +02:00
|
|
|
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
|
2009-11-24 17:41:08 +01:00
|
|
|
import org.jboss.netty.bootstrap.ClientBootstrap
|
2009-07-18 00:16:32 +02:00
|
|
|
import org.jboss.netty.handler.codec.frame.{LengthFieldBasedFrameDecoder, LengthFieldPrepender}
|
2009-11-24 17:41:08 +01:00
|
|
|
import org.jboss.netty.handler.codec.compression.{ZlibDecoder, ZlibEncoder}
|
2009-07-18 00:16:32 +02:00
|
|
|
import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder}
|
2009-10-22 11:14:36 +02:00
|
|
|
import org.jboss.netty.handler.timeout.ReadTimeoutHandler
|
|
|
|
|
import org.jboss.netty.util.{TimerTask, Timeout, HashedWheelTimer}
|
2009-06-24 15:12:47 +02:00
|
|
|
|
2009-12-29 14:24:48 +01:00
|
|
|
import java.net.{SocketAddress, InetSocketAddress}
|
2010-04-06 15:07:02 +02:00
|
|
|
import java.util.concurrent.{TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap, ConcurrentSkipListSet}
|
2009-12-27 08:24:11 +01:00
|
|
|
import java.util.concurrent.atomic.AtomicLong
|
|
|
|
|
|
2010-02-16 15:39:09 +01:00
|
|
|
import scala.collection.mutable.{HashSet, HashMap}
|
|
|
|
|
|
2009-12-27 22:56:55 +01:00
|
|
|
/**
|
|
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
|
|
|
*/
|
2009-12-27 08:24:11 +01:00
|
|
|
object RemoteRequestIdFactory {
|
2009-12-30 09:24:10 +01:00
|
|
|
private val nodeId = UUID.newUuid
|
2009-12-27 08:24:11 +01:00
|
|
|
private val id = new AtomicLong
|
2010-02-16 15:39:09 +01:00
|
|
|
|
2009-12-27 08:24:11 +01:00
|
|
|
def nextId: Long = id.getAndIncrement + nodeId
|
|
|
|
|
}
|
|
|
|
|
|
2010-04-06 15:07:02 +02:00
|
|
|
sealed trait RemoteClientLifeCycleEvent
|
|
|
|
|
case class RemoteClientError(cause: Throwable) extends RemoteClientLifeCycleEvent
|
|
|
|
|
case class RemoteClientDisconnected(host: String, port: Int) extends RemoteClientLifeCycleEvent
|
|
|
|
|
case class RemoteClientConnected(host: String, port: Int) extends RemoteClientLifeCycleEvent
|
|
|
|
|
|
2009-07-23 20:01:37 +02:00
|
|
|
/**
|
|
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
|
|
|
*/
|
2009-07-01 15:29:06 +02:00
|
|
|
object RemoteClient extends Logging {
|
2009-10-22 11:14:36 +02:00
|
|
|
val READ_TIMEOUT = config.getInt("akka.remote.client.read-timeout", 10000)
|
|
|
|
|
val RECONNECT_DELAY = config.getInt("akka.remote.client.reconnect-delay", 5000)
|
|
|
|
|
|
2010-02-15 16:21:26 +01:00
|
|
|
private val remoteClients = new HashMap[String, RemoteClient]
|
|
|
|
|
private val remoteActors = new HashMap[RemoteServer.Address, HashSet[String]]
|
|
|
|
|
|
2010-02-16 15:39:09 +01:00
|
|
|
// FIXME: simplify overloaded methods when we have Scala 2.8
|
2010-02-17 15:32:17 +01:00
|
|
|
|
2010-02-16 15:39:09 +01:00
|
|
|
def actorFor(className: String, hostname: String, port: Int): Actor =
|
2010-02-17 15:32:17 +01:00
|
|
|
actorFor(className, className, 5000L, hostname, port)
|
2010-02-16 15:39:09 +01:00
|
|
|
|
|
|
|
|
def actorFor(actorId: String, className: String, hostname: String, port: Int): Actor =
|
2010-02-17 15:32:17 +01:00
|
|
|
actorFor(actorId, className, 5000L, hostname, port)
|
|
|
|
|
|
2010-02-16 15:39:09 +01:00
|
|
|
def actorFor(className: String, timeout: Long, hostname: String, port: Int): Actor =
|
|
|
|
|
actorFor(className, className, timeout, hostname, port)
|
|
|
|
|
|
|
|
|
|
def actorFor(actorId: String, className: String, timeout: Long, hostname: String, port: Int): Actor = {
|
|
|
|
|
new Actor {
|
|
|
|
|
start
|
|
|
|
|
val remoteClient = RemoteClient.clientFor(hostname, port)
|
|
|
|
|
|
|
|
|
|
override def postMessageToMailbox(message: Any, sender: Option[Actor]): Unit = {
|
|
|
|
|
val requestBuilder = RemoteRequest.newBuilder
|
|
|
|
|
.setId(RemoteRequestIdFactory.nextId)
|
|
|
|
|
.setTarget(className)
|
|
|
|
|
.setTimeout(timeout)
|
|
|
|
|
.setUuid(actorId)
|
|
|
|
|
.setIsActor(true)
|
|
|
|
|
.setIsOneWay(true)
|
|
|
|
|
.setIsEscaped(false)
|
|
|
|
|
if (sender.isDefined) {
|
|
|
|
|
val s = sender.get
|
|
|
|
|
requestBuilder.setSourceTarget(s.getClass.getName)
|
|
|
|
|
requestBuilder.setSourceUuid(s.uuid)
|
|
|
|
|
val (host, port) = s._replyToAddress.map(a => (a.getHostName, a.getPort)).getOrElse((Actor.HOSTNAME, Actor.PORT))
|
|
|
|
|
requestBuilder.setSourceHostname(host)
|
|
|
|
|
requestBuilder.setSourcePort(port)
|
|
|
|
|
}
|
|
|
|
|
RemoteProtocolBuilder.setMessage(message, requestBuilder)
|
|
|
|
|
remoteClient.send(requestBuilder.build, None)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def postMessageToMailboxAndCreateFutureResultWithTimeout(
|
|
|
|
|
message: Any,
|
|
|
|
|
timeout: Long,
|
2010-03-01 22:03:17 +01:00
|
|
|
senderFuture: Option[CompletableFuture]): CompletableFuture = {
|
2010-02-16 15:39:09 +01:00
|
|
|
val requestBuilder = RemoteRequest.newBuilder
|
|
|
|
|
.setId(RemoteRequestIdFactory.nextId)
|
|
|
|
|
.setTarget(className)
|
|
|
|
|
.setTimeout(timeout)
|
|
|
|
|
.setUuid(actorId)
|
|
|
|
|
.setIsActor(true)
|
|
|
|
|
.setIsOneWay(false)
|
|
|
|
|
.setIsEscaped(false)
|
|
|
|
|
RemoteProtocolBuilder.setMessage(message, requestBuilder)
|
|
|
|
|
val future = remoteClient.send(requestBuilder.build, senderFuture)
|
|
|
|
|
if (future.isDefined) future.get
|
|
|
|
|
else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def receive = {case _ => {}}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2010-02-15 16:21:26 +01:00
|
|
|
def clientFor(hostname: String, port: Int): RemoteClient = clientFor(new InetSocketAddress(hostname, port))
|
2009-10-22 11:14:36 +02:00
|
|
|
|
2009-07-02 13:23:03 +02:00
|
|
|
def clientFor(address: InetSocketAddress): RemoteClient = synchronized {
|
|
|
|
|
val hostname = address.getHostName
|
|
|
|
|
val port = address.getPort
|
2009-10-22 11:14:36 +02:00
|
|
|
val hash = hostname + ':' + port
|
2010-02-15 16:21:26 +01:00
|
|
|
if (remoteClients.contains(hash)) remoteClients(hash)
|
2009-07-02 13:23:03 +02:00
|
|
|
else {
|
|
|
|
|
val client = new RemoteClient(hostname, port)
|
|
|
|
|
client.connect
|
2010-02-15 16:21:26 +01:00
|
|
|
remoteClients += hash -> client
|
2009-07-02 13:23:03 +02:00
|
|
|
client
|
|
|
|
|
}
|
|
|
|
|
}
|
2009-12-14 19:22:37 +01:00
|
|
|
|
2010-02-15 16:21:26 +01:00
|
|
|
def shutdownClientFor(address: InetSocketAddress) = synchronized {
|
|
|
|
|
val hostname = address.getHostName
|
|
|
|
|
val port = address.getPort
|
|
|
|
|
val hash = hostname + ':' + port
|
|
|
|
|
if (remoteClients.contains(hash)) {
|
|
|
|
|
val client = remoteClients(hash)
|
|
|
|
|
client.shutdown
|
2010-03-29 09:33:32 +02:00
|
|
|
remoteClients -= hash
|
2010-02-15 16:21:26 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2009-12-27 22:56:55 +01:00
|
|
|
/**
|
|
|
|
|
* Clean-up all open connections.
|
|
|
|
|
*/
|
2010-02-15 16:21:26 +01:00
|
|
|
def shutdownAll = synchronized {
|
|
|
|
|
remoteClients.foreach({case (addr, client) => client.shutdown})
|
|
|
|
|
remoteClients.clear
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private[akka] def register(hostname: String, port: Int, uuid: String) = synchronized {
|
2010-03-29 09:33:32 +02:00
|
|
|
actorsFor(RemoteServer.Address(hostname, port)) += uuid
|
2010-02-15 16:21:26 +01:00
|
|
|
}
|
|
|
|
|
|
2010-04-06 15:07:02 +02:00
|
|
|
// TODO: add RemoteClient.unregister for ActiveObject, but first need a @shutdown callback
|
2010-02-15 16:21:26 +01:00
|
|
|
private[akka] def unregister(hostname: String, port: Int, uuid: String) = synchronized {
|
|
|
|
|
val set = actorsFor(RemoteServer.Address(hostname, port))
|
2010-03-29 09:33:32 +02:00
|
|
|
set -= uuid
|
2010-02-15 16:21:26 +01:00
|
|
|
if (set.isEmpty) shutdownClientFor(new InetSocketAddress(hostname, port))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private[akka] def actorsFor(remoteServerAddress: RemoteServer.Address): HashSet[String] = {
|
|
|
|
|
val set = remoteActors.get(remoteServerAddress)
|
|
|
|
|
if (set.isDefined && (set.get ne null)) set.get
|
|
|
|
|
else {
|
|
|
|
|
val remoteActorSet = new HashSet[String]
|
|
|
|
|
remoteActors.put(remoteServerAddress, remoteActorSet)
|
|
|
|
|
remoteActorSet
|
|
|
|
|
}
|
2009-12-14 19:22:37 +01:00
|
|
|
}
|
2009-07-02 13:23:03 +02:00
|
|
|
}
|
2009-06-24 15:12:47 +02:00
|
|
|
|
2009-07-23 20:01:37 +02:00
|
|
|
/**
|
|
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
|
|
|
*/
|
2010-04-06 15:07:02 +02:00
|
|
|
class RemoteClient(val hostname: String, val port: Int) extends Logging {
|
2010-02-15 16:21:26 +01:00
|
|
|
val name = "RemoteClient@" + hostname + "::" + port
|
|
|
|
|
|
|
|
|
|
@volatile private[remote] var isRunning = false
|
2010-03-01 22:03:17 +01:00
|
|
|
private val futures = new ConcurrentHashMap[Long, CompletableFuture]
|
2009-07-01 15:29:06 +02:00
|
|
|
private val supervisors = new ConcurrentHashMap[String, Actor]
|
2010-04-06 15:07:02 +02:00
|
|
|
private[remote] val listeners = new ConcurrentSkipListSet[Actor]
|
2009-06-24 15:12:47 +02:00
|
|
|
|
|
|
|
|
private val channelFactory = new NioClientSocketChannelFactory(
|
|
|
|
|
Executors.newCachedThreadPool,
|
|
|
|
|
Executors.newCachedThreadPool)
|
|
|
|
|
|
|
|
|
|
private val bootstrap = new ClientBootstrap(channelFactory)
|
2010-02-15 16:21:26 +01:00
|
|
|
private val openChannels = new DefaultChannelGroup(classOf[RemoteClient].getName);
|
2009-06-24 15:12:47 +02:00
|
|
|
|
2009-12-14 19:22:37 +01:00
|
|
|
private val timer = new HashedWheelTimer
|
2009-12-29 14:24:48 +01:00
|
|
|
private val remoteAddress = new InetSocketAddress(hostname, port)
|
2009-12-30 08:36:24 +01:00
|
|
|
private[remote] var connection: ChannelFuture = _
|
2009-12-14 19:22:37 +01:00
|
|
|
|
2009-12-30 08:36:24 +01:00
|
|
|
bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(name, futures, supervisors, bootstrap, remoteAddress, timer, this))
|
2009-06-24 15:12:47 +02:00
|
|
|
bootstrap.setOption("tcpNoDelay", true)
|
|
|
|
|
bootstrap.setOption("keepAlive", true)
|
|
|
|
|
|
2009-06-25 13:07:58 +02:00
|
|
|
def connect = synchronized {
|
|
|
|
|
if (!isRunning) {
|
2009-12-29 14:24:48 +01:00
|
|
|
connection = bootstrap.connect(remoteAddress)
|
2009-07-02 13:23:03 +02:00
|
|
|
log.info("Starting remote client connection to [%s:%s]", hostname, port)
|
2009-06-25 13:07:58 +02:00
|
|
|
// Wait until the connection attempt succeeds or fails.
|
2010-02-15 16:21:26 +01:00
|
|
|
val channel = connection.awaitUninterruptibly.getChannel
|
|
|
|
|
openChannels.add(channel)
|
2010-04-06 15:07:02 +02:00
|
|
|
if (!connection.isSuccess) {
|
|
|
|
|
listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientError(connection.getCause))
|
|
|
|
|
log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port)
|
|
|
|
|
}
|
2009-06-25 13:07:58 +02:00
|
|
|
isRunning = true
|
|
|
|
|
}
|
2009-06-24 15:12:47 +02:00
|
|
|
}
|
|
|
|
|
|
2009-06-25 13:07:58 +02:00
|
|
|
def shutdown = synchronized {
|
2010-02-15 16:21:26 +01:00
|
|
|
if (isRunning) {
|
|
|
|
|
isRunning = false
|
|
|
|
|
openChannels.close.awaitUninterruptibly
|
|
|
|
|
bootstrap.releaseExternalResources
|
|
|
|
|
timer.stop
|
|
|
|
|
log.info("%s has been shut down", name)
|
2009-06-25 13:07:58 +02:00
|
|
|
}
|
2009-06-24 15:12:47 +02:00
|
|
|
}
|
|
|
|
|
|
2010-03-01 22:03:17 +01:00
|
|
|
def send(request: RemoteRequest, senderFuture: Option[CompletableFuture]): Option[CompletableFuture] = if (isRunning) {
|
2009-07-18 00:16:32 +02:00
|
|
|
if (request.getIsOneWay) {
|
|
|
|
|
connection.getChannel.write(request)
|
2009-06-30 16:01:50 +02:00
|
|
|
None
|
2009-06-24 15:12:47 +02:00
|
|
|
} else {
|
|
|
|
|
futures.synchronized {
|
2009-12-27 22:56:55 +01:00
|
|
|
val futureResult = if (senderFuture.isDefined) senderFuture.get
|
2010-03-01 22:03:17 +01:00
|
|
|
else new DefaultCompletableFuture(request.getTimeout)
|
2009-07-18 00:16:32 +02:00
|
|
|
futures.put(request.getId, futureResult)
|
|
|
|
|
connection.getChannel.write(request)
|
2009-06-30 16:01:50 +02:00
|
|
|
Some(futureResult)
|
2010-02-15 16:21:26 +01:00
|
|
|
}
|
2009-06-24 15:12:47 +02:00
|
|
|
}
|
2010-04-06 15:07:02 +02:00
|
|
|
} else {
|
|
|
|
|
val exception = new IllegalStateException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.")
|
|
|
|
|
listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientError(exception))
|
|
|
|
|
throw exception
|
|
|
|
|
}
|
2009-07-01 15:29:06 +02:00
|
|
|
|
|
|
|
|
def registerSupervisorForActor(actor: Actor) =
|
2009-10-28 13:20:28 +01:00
|
|
|
if (!actor._supervisor.isDefined) throw new IllegalStateException("Can't register supervisor for " + actor + " since it is not under supervision")
|
|
|
|
|
else supervisors.putIfAbsent(actor._supervisor.get.uuid, actor)
|
2009-07-01 15:29:06 +02:00
|
|
|
|
|
|
|
|
def deregisterSupervisorForActor(actor: Actor) =
|
2009-10-28 13:20:28 +01:00
|
|
|
if (!actor._supervisor.isDefined) throw new IllegalStateException("Can't unregister supervisor for " + actor + " since it is not under supervision")
|
|
|
|
|
else supervisors.remove(actor._supervisor.get.uuid)
|
2010-02-15 16:21:26 +01:00
|
|
|
|
2010-04-06 15:07:02 +02:00
|
|
|
def registerListener(actor: Actor) = listeners.add(actor)
|
|
|
|
|
|
|
|
|
|
def deregisterListener(actor: Actor) = listeners.remove(actor)
|
2009-07-18 00:16:32 +02:00
|
|
|
}
|
2009-06-24 15:12:47 +02:00
|
|
|
|
2009-07-27 21:21:28 +02:00
|
|
|
/**
|
|
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
|
|
|
*/
|
2010-02-15 16:21:26 +01:00
|
|
|
class RemoteClientPipelineFactory(name: String,
|
2010-03-01 22:03:17 +01:00
|
|
|
futures: ConcurrentMap[Long, CompletableFuture],
|
2009-10-22 11:14:36 +02:00
|
|
|
supervisors: ConcurrentMap[String, Actor],
|
2009-12-14 19:22:37 +01:00
|
|
|
bootstrap: ClientBootstrap,
|
2009-12-29 14:24:48 +01:00
|
|
|
remoteAddress: SocketAddress,
|
2009-12-30 08:36:24 +01:00
|
|
|
timer: HashedWheelTimer,
|
|
|
|
|
client: RemoteClient) extends ChannelPipelineFactory {
|
2009-07-18 00:16:32 +02:00
|
|
|
def getPipeline: ChannelPipeline = {
|
2010-02-16 15:39:09 +01:00
|
|
|
val timeout = new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT)
|
|
|
|
|
val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)
|
|
|
|
|
val lenPrep = new LengthFieldPrepender(4)
|
|
|
|
|
val protobufDec = new ProtobufDecoder(RemoteReply.getDefaultInstance)
|
|
|
|
|
val protobufEnc = new ProtobufEncoder
|
2009-12-30 09:24:10 +01:00
|
|
|
val zipCodec = RemoteServer.COMPRESSION_SCHEME match {
|
2010-02-16 15:39:09 +01:00
|
|
|
case "zlib" => Some(Codec(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL), new ZlibDecoder))
|
2009-12-30 09:24:10 +01:00
|
|
|
//case "lzf" => Some(Codec(new LzfEncoder, new LzfDecoder))
|
|
|
|
|
case _ => None
|
|
|
|
|
}
|
|
|
|
|
val remoteClient = new RemoteClientHandler(name, futures, supervisors, bootstrap, remoteAddress, timer, client)
|
|
|
|
|
|
2010-02-15 16:21:26 +01:00
|
|
|
val stages: Array[ChannelHandler] =
|
2010-02-16 15:39:09 +01:00
|
|
|
zipCodec.map(codec => Array(timeout, codec.decoder, lenDec, protobufDec, codec.encoder, lenPrep, protobufEnc, remoteClient))
|
|
|
|
|
.getOrElse(Array(timeout, lenDec, protobufDec, lenPrep, protobufEnc, remoteClient))
|
2009-12-30 09:24:10 +01:00
|
|
|
new StaticChannelPipeline(stages: _*)
|
2009-06-24 15:12:47 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2009-07-27 21:21:28 +02:00
|
|
|
/**
|
|
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
|
|
|
*/
|
2010-03-29 09:33:32 +02:00
|
|
|
@ChannelHandler.Sharable
|
2009-08-11 12:16:50 +02:00
|
|
|
class RemoteClientHandler(val name: String,
|
2010-03-01 22:03:17 +01:00
|
|
|
val futures: ConcurrentMap[Long, CompletableFuture],
|
2009-10-22 11:14:36 +02:00
|
|
|
val supervisors: ConcurrentMap[String, Actor],
|
2009-12-14 19:22:37 +01:00
|
|
|
val bootstrap: ClientBootstrap,
|
2009-12-29 14:24:48 +01:00
|
|
|
val remoteAddress: SocketAddress,
|
2009-12-30 08:36:24 +01:00
|
|
|
val timer: HashedWheelTimer,
|
|
|
|
|
val client: RemoteClient)
|
2010-02-16 15:39:09 +01:00
|
|
|
extends SimpleChannelUpstreamHandler with Logging {
|
2009-06-24 15:12:47 +02:00
|
|
|
|
|
|
|
|
override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = {
|
2009-11-21 20:51:03 +01:00
|
|
|
if (event.isInstanceOf[ChannelStateEvent] &&
|
|
|
|
|
event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) {
|
2009-06-24 15:12:47 +02:00
|
|
|
log.debug(event.toString)
|
|
|
|
|
}
|
|
|
|
|
super.handleUpstream(ctx, event)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) {
|
|
|
|
|
try {
|
|
|
|
|
val result = event.getMessage
|
|
|
|
|
if (result.isInstanceOf[RemoteReply]) {
|
|
|
|
|
val reply = result.asInstanceOf[RemoteReply]
|
2009-10-22 11:14:36 +02:00
|
|
|
log.debug("Remote client received RemoteReply[\n%s]", reply.toString)
|
2009-07-13 00:17:57 +02:00
|
|
|
val future = futures.get(reply.getId)
|
2009-07-18 00:16:32 +02:00
|
|
|
if (reply.getIsSuccessful) {
|
2009-07-23 20:01:37 +02:00
|
|
|
val message = RemoteProtocolBuilder.getMessage(reply)
|
2009-07-18 00:16:32 +02:00
|
|
|
future.completeWithResult(message)
|
|
|
|
|
} else {
|
2009-10-22 11:14:36 +02:00
|
|
|
if (reply.hasSupervisorUuid()) {
|
2009-07-18 00:16:32 +02:00
|
|
|
val supervisorUuid = reply.getSupervisorUuid
|
2010-04-06 15:07:02 +02:00
|
|
|
if (!supervisors.containsKey(supervisorUuid))
|
|
|
|
|
throw new IllegalStateException("Expected a registered supervisor for UUID [" + supervisorUuid + "] but none was found")
|
2009-07-01 15:29:06 +02:00
|
|
|
val supervisedActor = supervisors.get(supervisorUuid)
|
2010-04-06 15:07:02 +02:00
|
|
|
if (!supervisedActor._supervisor.isDefined)
|
|
|
|
|
throw new IllegalStateException("Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed")
|
2009-10-28 13:20:28 +01:00
|
|
|
else supervisedActor._supervisor.get ! Exit(supervisedActor, parseException(reply))
|
2009-07-01 15:29:06 +02:00
|
|
|
}
|
2009-07-23 20:01:37 +02:00
|
|
|
future.completeWithException(null, parseException(reply))
|
2009-07-01 15:29:06 +02:00
|
|
|
}
|
2009-08-11 12:16:50 +02:00
|
|
|
futures.remove(reply.getId)
|
2010-04-06 15:07:02 +02:00
|
|
|
} else {
|
|
|
|
|
val exception = new IllegalArgumentException("Unknown message received in remote client handler: " + result)
|
|
|
|
|
client.listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientError(exception))
|
|
|
|
|
throw exception
|
|
|
|
|
}
|
2009-06-24 15:12:47 +02:00
|
|
|
} catch {
|
2009-07-01 15:29:06 +02:00
|
|
|
case e: Exception =>
|
2010-04-06 15:07:02 +02:00
|
|
|
client.listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientError(e))
|
2009-07-01 15:29:06 +02:00
|
|
|
log.error("Unexpected exception in remote client handler: %s", e)
|
|
|
|
|
throw e
|
2009-06-24 15:12:47 +02:00
|
|
|
}
|
2010-02-15 16:21:26 +01:00
|
|
|
}
|
2009-06-24 15:12:47 +02:00
|
|
|
|
2010-02-15 16:21:26 +01:00
|
|
|
override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = if (client.isRunning) {
|
2009-12-14 19:22:37 +01:00
|
|
|
timer.newTimeout(new TimerTask() {
|
2009-10-22 11:14:36 +02:00
|
|
|
def run(timeout: Timeout) = {
|
2009-12-29 14:24:48 +01:00
|
|
|
log.debug("Remote client reconnecting to [%s]", remoteAddress)
|
2009-12-30 08:36:24 +01:00
|
|
|
client.connection = bootstrap.connect(remoteAddress)
|
|
|
|
|
|
|
|
|
|
// Wait until the connection attempt succeeds or fails.
|
|
|
|
|
client.connection.awaitUninterruptibly
|
2010-04-06 15:07:02 +02:00
|
|
|
if (!client.connection.isSuccess) {
|
|
|
|
|
client.listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientError(client.connection.getCause))
|
|
|
|
|
log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress)
|
|
|
|
|
}
|
2009-10-22 11:14:36 +02:00
|
|
|
}
|
|
|
|
|
}, RemoteClient.RECONNECT_DELAY, TimeUnit.MILLISECONDS)
|
|
|
|
|
}
|
|
|
|
|
|
2010-04-06 15:07:02 +02:00
|
|
|
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
|
|
|
|
client.listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientConnected(client.hostname, client.port))
|
2009-10-22 11:14:36 +02:00
|
|
|
log.debug("Remote client connected to [%s]", ctx.getChannel.getRemoteAddress)
|
2010-04-06 15:07:02 +02:00
|
|
|
}
|
2009-10-22 11:14:36 +02:00
|
|
|
|
2010-04-06 15:07:02 +02:00
|
|
|
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
|
|
|
|
client.listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientDisconnected(client.hostname, client.port))
|
|
|
|
|
log.debug("Remote client disconnected from [%s]", ctx.getChannel.getRemoteAddress)
|
|
|
|
|
}
|
2009-10-22 11:14:36 +02:00
|
|
|
|
|
|
|
|
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
|
2010-04-06 15:07:02 +02:00
|
|
|
client.listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientError(event.getCause))
|
2009-11-22 15:25:16 +01:00
|
|
|
log.error(event.getCause, "Unexpected exception from downstream in remote client")
|
2009-06-24 15:12:47 +02:00
|
|
|
event.getChannel.close
|
|
|
|
|
}
|
2009-07-23 20:01:37 +02:00
|
|
|
|
|
|
|
|
private def parseException(reply: RemoteReply) = {
|
|
|
|
|
val exception = reply.getException
|
|
|
|
|
val exceptionType = Class.forName(exception.substring(0, exception.indexOf('$')))
|
|
|
|
|
val exceptionMessage = exception.substring(exception.indexOf('$') + 1, exception.length)
|
|
|
|
|
exceptionType
|
2010-02-16 15:39:09 +01:00
|
|
|
.getConstructor(Array[Class[_]](classOf[String]): _*)
|
|
|
|
|
.newInstance(exceptionMessage).asInstanceOf[Throwable]
|
2009-07-23 20:01:37 +02:00
|
|
|
}
|
2009-06-24 15:12:47 +02:00
|
|
|
}
|