Merge remote branch 'origin/0deps' into 0deps
This commit is contained in:
commit
d8c556bb39
56 changed files with 813 additions and 1069 deletions
|
|
@ -16,6 +16,7 @@ import akka.remoteinterface._
|
|||
import akka.actor. {Index, ActorInitializationException, LocalActorRef, newUuid, ActorRegistry, Actor, RemoteActorRef, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid, Exit, LifeCycleMessage, ActorType => AkkaActorType}
|
||||
import akka.AkkaException
|
||||
import akka.actor.Actor._
|
||||
import akka.actor.{EventHandler}
|
||||
import akka.util._
|
||||
import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings}
|
||||
|
||||
|
|
@ -38,7 +39,7 @@ import scala.reflect.BeanProperty
|
|||
import java.lang.reflect.InvocationTargetException
|
||||
import java.util.concurrent.atomic. {AtomicReference, AtomicLong, AtomicBoolean}
|
||||
|
||||
trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagement with Logging =>
|
||||
trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagement =>
|
||||
private val remoteClients = new HashMap[Address, RemoteClient]
|
||||
private val remoteActors = new Index[Address, Uuid]
|
||||
private val lock = new ReadWriteGuard
|
||||
|
|
@ -142,7 +143,7 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
|
|||
*/
|
||||
abstract class RemoteClient private[akka] (
|
||||
val module: NettyRemoteClientModule,
|
||||
val remoteAddress: InetSocketAddress) extends Logging {
|
||||
val remoteAddress: InetSocketAddress) {
|
||||
|
||||
val name = this.getClass.getSimpleName + "@" + remoteAddress.getAddress.getHostAddress + "::" + remoteAddress.getPort
|
||||
|
||||
|
|
@ -194,7 +195,6 @@ abstract class RemoteClient private[akka] (
|
|||
def send[T](
|
||||
request: RemoteMessageProtocol,
|
||||
senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = {
|
||||
log.slf4j.debug("sending message: {} has future {}", request, senderFuture)
|
||||
if (isRunning) {
|
||||
if (request.getOneWay) {
|
||||
currentChannel.write(request).addListener(new ChannelFutureListener {
|
||||
|
|
@ -272,22 +272,17 @@ class ActiveRemoteClient private[akka] (
|
|||
bootstrap.setOption("tcpNoDelay", true)
|
||||
bootstrap.setOption("keepAlive", true)
|
||||
|
||||
log.slf4j.info("Starting remote client connection to [{}]", remoteAddress)
|
||||
|
||||
// Wait until the connection attempt succeeds or fails.
|
||||
connection = bootstrap.connect(remoteAddress)
|
||||
openChannels.add(connection.awaitUninterruptibly.getChannel)
|
||||
|
||||
if (!connection.isSuccess) {
|
||||
notifyListeners(RemoteClientError(connection.getCause, module, remoteAddress))
|
||||
log.slf4j.error("Remote client connection to [{}] has failed", remoteAddress)
|
||||
log.slf4j.debug("Remote client connection failed", connection.getCause)
|
||||
false
|
||||
} else {
|
||||
timer.newTimeout(new TimerTask() {
|
||||
def run(timeout: Timeout) = {
|
||||
if(isRunning) {
|
||||
log.slf4j.debug("Reaping expired futures awaiting completion from [{}]", remoteAddress)
|
||||
val i = futures.entrySet.iterator
|
||||
while(i.hasNext) {
|
||||
val e = i.next
|
||||
|
|
@ -304,15 +299,12 @@ class ActiveRemoteClient private[akka] (
|
|||
case true => true
|
||||
case false if reconnectIfAlreadyConnected =>
|
||||
isAuthenticated.set(false)
|
||||
log.slf4j.debug("Remote client reconnecting to [{}]", remoteAddress)
|
||||
openChannels.remove(connection.getChannel)
|
||||
connection.getChannel.close
|
||||
connection = bootstrap.connect(remoteAddress)
|
||||
openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails.
|
||||
if (!connection.isSuccess) {
|
||||
notifyListeners(RemoteClientError(connection.getCause, module, remoteAddress))
|
||||
log.slf4j.error("Reconnection to [{}] has failed", remoteAddress)
|
||||
log.slf4j.debug("Reconnection failed", connection.getCause)
|
||||
false
|
||||
} else true
|
||||
case false => false
|
||||
|
|
@ -320,7 +312,6 @@ class ActiveRemoteClient private[akka] (
|
|||
}
|
||||
|
||||
def shutdown = runSwitch switchOff {
|
||||
log.slf4j.info("Shutting down {}", name)
|
||||
notifyListeners(RemoteClientShutdown(module, remoteAddress))
|
||||
timer.stop
|
||||
timer = null
|
||||
|
|
@ -329,7 +320,6 @@ class ActiveRemoteClient private[akka] (
|
|||
bootstrap.releaseExternalResources
|
||||
bootstrap = null
|
||||
connection = null
|
||||
log.slf4j.info("{} has been shut down", name)
|
||||
}
|
||||
|
||||
private[akka] def isWithinReconnectionTimeWindow: Boolean = {
|
||||
|
|
@ -339,7 +329,6 @@ class ActiveRemoteClient private[akka] (
|
|||
} else {
|
||||
val timeLeft = RECONNECTION_TIME_WINDOW - (System.currentTimeMillis - reconnectionTimeWindowStart)
|
||||
if (timeLeft > 0) {
|
||||
log.slf4j.info("Will try to reconnect to remote server for another [{}] milliseconds", timeLeft)
|
||||
true
|
||||
} else false
|
||||
}
|
||||
|
|
@ -399,12 +388,11 @@ class ActiveRemoteClientHandler(
|
|||
val remoteAddress: SocketAddress,
|
||||
val timer: HashedWheelTimer,
|
||||
val client: ActiveRemoteClient)
|
||||
extends SimpleChannelUpstreamHandler with Logging {
|
||||
extends SimpleChannelUpstreamHandler {
|
||||
|
||||
override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = {
|
||||
if (event.isInstanceOf[ChannelStateEvent] &&
|
||||
event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) {
|
||||
log.slf4j.debug(event.toString)
|
||||
}
|
||||
super.handleUpstream(ctx, event)
|
||||
}
|
||||
|
|
@ -414,8 +402,6 @@ class ActiveRemoteClientHandler(
|
|||
event.getMessage match {
|
||||
case reply: RemoteMessageProtocol =>
|
||||
val replyUuid = uuidFrom(reply.getActorInfo.getUuid.getHigh, reply.getActorInfo.getUuid.getLow)
|
||||
log.slf4j.debug("Remote client received RemoteMessageProtocol[\n{}]",reply)
|
||||
log.slf4j.debug("Trying to map back to future: {}",replyUuid)
|
||||
val future = futures.remove(replyUuid).asInstanceOf[CompletableFuture[Any]]
|
||||
|
||||
if (reply.hasMessage) {
|
||||
|
|
@ -442,9 +428,9 @@ class ActiveRemoteClientHandler(
|
|||
throw new RemoteClientException("Unknown message received in remote client handler: " + other, client.module, client.remoteAddress)
|
||||
}
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
case e: Throwable =>
|
||||
EventHandler notifyListeners EventHandler.Error(e, this)
|
||||
client.notifyListeners(RemoteClientError(e, client.module, client.remoteAddress))
|
||||
log.slf4j.error("Unexpected exception in remote client handler", e)
|
||||
throw e
|
||||
}
|
||||
}
|
||||
|
|
@ -465,7 +451,6 @@ class ActiveRemoteClientHandler(
|
|||
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
def connect = {
|
||||
client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress))
|
||||
log.slf4j.debug("Remote client connected to [{}]", ctx.getChannel.getRemoteAddress)
|
||||
client.resetReconnectionTimeWindow
|
||||
}
|
||||
|
||||
|
|
@ -482,16 +467,11 @@ class ActiveRemoteClientHandler(
|
|||
|
||||
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
client.notifyListeners(RemoteClientDisconnected(client.module, client.remoteAddress))
|
||||
log.slf4j.debug("Remote client disconnected from [{}]", ctx.getChannel.getRemoteAddress)
|
||||
}
|
||||
|
||||
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
|
||||
client.notifyListeners(RemoteClientError(event.getCause, client.module, client.remoteAddress))
|
||||
if (event.getCause ne null)
|
||||
log.slf4j.error("Unexpected exception from downstream in remote client", event.getCause)
|
||||
else
|
||||
log.slf4j.error("Unexpected exception from downstream in remote client: {}", event)
|
||||
|
||||
if (event.getCause ne null) event.getCause.printStackTrace
|
||||
event.getChannel.close
|
||||
}
|
||||
|
||||
|
|
@ -505,9 +485,8 @@ class ActiveRemoteClientHandler(
|
|||
.getConstructor(Array[Class[_]](classOf[String]): _*)
|
||||
.newInstance(exception.getMessage).asInstanceOf[Throwable]
|
||||
} catch {
|
||||
case problem =>
|
||||
log.debug("Couldn't parse exception returned from RemoteServer",problem)
|
||||
log.warn("Couldn't create instance of {} with message {}, returning UnparsableException",classname, exception.getMessage)
|
||||
case problem: Throwable =>
|
||||
EventHandler notifyListeners EventHandler.Error(problem, this)
|
||||
UnparsableException(classname, exception.getMessage)
|
||||
}
|
||||
}
|
||||
|
|
@ -578,8 +557,8 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String,
|
|||
bootstrap.releaseExternalResources
|
||||
serverModule.notifyListeners(RemoteServerShutdown(serverModule))
|
||||
} catch {
|
||||
case e: java.nio.channels.ClosedChannelException => {}
|
||||
case e => serverModule.log.slf4j.warn("Could not close remote server channel in a graceful way")
|
||||
case e: Exception =>
|
||||
EventHandler notifyListeners EventHandler.Error(e, this)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -607,12 +586,11 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
|
|||
def start(_hostname: String, _port: Int, loader: Option[ClassLoader] = None): RemoteServerModule = guard withGuard {
|
||||
try {
|
||||
_isRunning switchOn {
|
||||
log.slf4j.debug("Starting up remote server on {}:{}",_hostname, _port)
|
||||
currentServer.set(Some(new NettyRemoteServer(this, _hostname, _port, loader)))
|
||||
}
|
||||
} catch {
|
||||
case e =>
|
||||
log.slf4j.error("Could not start up remote server", e)
|
||||
case e: Exception =>
|
||||
EventHandler notifyListeners EventHandler.Error(e, this)
|
||||
notifyListeners(RemoteServerError(e, this))
|
||||
}
|
||||
this
|
||||
|
|
@ -622,7 +600,6 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
|
|||
_isRunning switchOff {
|
||||
currentServer.getAndSet(None) foreach {
|
||||
instance =>
|
||||
log.slf4j.debug("Shutting down remote server on {}:{}",instance.host, instance.port)
|
||||
instance.shutdown
|
||||
}
|
||||
}
|
||||
|
|
@ -634,7 +611,6 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
|
|||
* @param typedActor typed actor to register
|
||||
*/
|
||||
def registerTypedActor(id: String, typedActor: AnyRef): Unit = guard withGuard {
|
||||
log.slf4j.debug("Registering server side remote typed actor [{}] with id [{}]", typedActor.getClass.getName, id)
|
||||
if (id.startsWith(UUID_PREFIX)) registerTypedActor(id.substring(UUID_PREFIX.length), typedActor, typedActorsByUuid)
|
||||
else registerTypedActor(id, typedActor, typedActors)
|
||||
}
|
||||
|
|
@ -645,7 +621,6 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
|
|||
* @param typedActor typed actor to register
|
||||
*/
|
||||
def registerTypedPerSessionActor(id: String, factory: => AnyRef): Unit = guard withGuard {
|
||||
log.slf4j.debug("Registering server side typed remote session actor with id [{}]", id)
|
||||
registerTypedPerSessionActor(id, () => factory, typedActorsFactories)
|
||||
}
|
||||
|
||||
|
|
@ -655,13 +630,11 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
|
|||
* NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself.
|
||||
*/
|
||||
def register(id: String, actorRef: ActorRef): Unit = guard withGuard {
|
||||
log.slf4j.debug("Registering server side remote actor [{}] with id [{}]", actorRef.actorClass.getName, id)
|
||||
if (id.startsWith(UUID_PREFIX)) register(id.substring(UUID_PREFIX.length), actorRef, actorsByUuid)
|
||||
else register(id, actorRef, actors)
|
||||
}
|
||||
|
||||
def registerByUuid(actorRef: ActorRef): Unit = guard withGuard {
|
||||
log.slf4j.debug("Registering remote actor {} to it's uuid {}", actorRef, actorRef.uuid)
|
||||
register(actorRef.uuid.toString, actorRef, actorsByUuid)
|
||||
}
|
||||
|
||||
|
|
@ -678,7 +651,6 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
|
|||
* NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself.
|
||||
*/
|
||||
def registerPerSession(id: String, factory: => ActorRef): Unit = synchronized {
|
||||
log.slf4j.debug("Registering server side remote session actor with id [{}]", id)
|
||||
registerPerSession(id, () => factory, actorsFactories)
|
||||
}
|
||||
|
||||
|
|
@ -702,7 +674,6 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
|
|||
*/
|
||||
def unregister(actorRef: ActorRef): Unit = guard withGuard {
|
||||
if (_isRunning.isOn) {
|
||||
log.slf4j.debug("Unregistering server side remote actor [{}] with id [{}:{}]", Array[AnyRef](actorRef.actorClass.getName, actorRef.id, actorRef.uuid))
|
||||
actors.remove(actorRef.id, actorRef)
|
||||
actorsByUuid.remove(actorRef.uuid, actorRef)
|
||||
}
|
||||
|
|
@ -715,7 +686,6 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
|
|||
*/
|
||||
def unregister(id: String): Unit = guard withGuard {
|
||||
if (_isRunning.isOn) {
|
||||
log.slf4j.info("Unregistering server side remote actor with id [{}]", id)
|
||||
if (id.startsWith(UUID_PREFIX)) actorsByUuid.remove(id.substring(UUID_PREFIX.length))
|
||||
else {
|
||||
val actorRef = actors get id
|
||||
|
|
@ -732,7 +702,6 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
|
|||
*/
|
||||
def unregisterPerSession(id: String): Unit = {
|
||||
if (_isRunning.isOn) {
|
||||
log.slf4j.info("Unregistering server side remote session actor with id [{}]", id)
|
||||
actorsFactories.remove(id)
|
||||
}
|
||||
}
|
||||
|
|
@ -744,7 +713,6 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
|
|||
*/
|
||||
def unregisterTypedActor(id: String):Unit = guard withGuard {
|
||||
if (_isRunning.isOn) {
|
||||
log.slf4j.info("Unregistering server side remote typed actor with id [{}]", id)
|
||||
if (id.startsWith(UUID_PREFIX)) typedActorsByUuid.remove(id.substring(UUID_PREFIX.length))
|
||||
else typedActors.remove(id)
|
||||
}
|
||||
|
|
@ -818,7 +786,7 @@ class RemoteServerHandler(
|
|||
val name: String,
|
||||
val openChannels: ChannelGroup,
|
||||
val applicationLoader: Option[ClassLoader],
|
||||
val server: NettyRemoteServerModule) extends SimpleChannelUpstreamHandler with Logging {
|
||||
val server: NettyRemoteServerModule) extends SimpleChannelUpstreamHandler {
|
||||
import RemoteServerSettings._
|
||||
val CHANNEL_INIT = "channel-init".intern
|
||||
|
||||
|
|
@ -855,7 +823,6 @@ class RemoteServerHandler(
|
|||
val clientAddress = getClientAddress(ctx)
|
||||
sessionActors.set(event.getChannel(), new ConcurrentHashMap[String, ActorRef]())
|
||||
typedSessionActors.set(event.getChannel(), new ConcurrentHashMap[String, AnyRef]())
|
||||
log.slf4j.debug("Remote client [{}] connected to [{}]", clientAddress, server.name)
|
||||
if (SECURE) {
|
||||
val sslHandler: SslHandler = ctx.getPipeline.get(classOf[SslHandler])
|
||||
// Begin handshake.
|
||||
|
|
@ -876,17 +843,16 @@ class RemoteServerHandler(
|
|||
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
import scala.collection.JavaConversions.asScalaIterable
|
||||
val clientAddress = getClientAddress(ctx)
|
||||
log.slf4j.debug("Remote client [{}] disconnected from [{}]", clientAddress, server.name)
|
||||
|
||||
// stop all session actors
|
||||
for (map <- Option(sessionActors.remove(event.getChannel));
|
||||
actor <- asScalaIterable(map.values)) {
|
||||
try { actor.stop } catch { case e: Exception => log.slf4j.warn("Couldn't stop {}",actor,e) }
|
||||
try { actor.stop } catch { case e: Exception => }
|
||||
}
|
||||
// stop all typed session actors
|
||||
for (map <- Option(typedSessionActors.remove(event.getChannel));
|
||||
actor <- asScalaIterable(map.values)) {
|
||||
try { TypedActor.stop(actor) } catch { case e: Exception => log.slf4j.warn("Couldn't stop {}",actor,e) }
|
||||
try { TypedActor.stop(actor) } catch { case e: Exception => }
|
||||
}
|
||||
|
||||
server.notifyListeners(RemoteServerClientDisconnected(server, clientAddress))
|
||||
|
|
@ -894,13 +860,11 @@ class RemoteServerHandler(
|
|||
|
||||
override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
val clientAddress = getClientAddress(ctx)
|
||||
log.slf4j.debug("Remote client [{}] channel closed from [{}]", clientAddress, server.name)
|
||||
server.notifyListeners(RemoteServerClientClosed(server, clientAddress))
|
||||
}
|
||||
|
||||
override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = {
|
||||
if (event.isInstanceOf[ChannelStateEvent] && event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) {
|
||||
log.slf4j.debug(event.toString)
|
||||
}
|
||||
super.handleUpstream(ctx, event)
|
||||
}
|
||||
|
|
@ -914,7 +878,6 @@ class RemoteServerHandler(
|
|||
}
|
||||
|
||||
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
|
||||
log.slf4j.error("Unexpected exception from remote downstream", event.getCause)
|
||||
event.getChannel.close
|
||||
server.notifyListeners(RemoteServerError(event.getCause, server))
|
||||
}
|
||||
|
|
@ -926,7 +889,6 @@ class RemoteServerHandler(
|
|||
}
|
||||
|
||||
private def handleRemoteMessageProtocol(request: RemoteMessageProtocol, channel: Channel) = {
|
||||
log.slf4j.debug("Received RemoteMessageProtocol[\n{}]",request)
|
||||
request.getActorInfo.getActorType match {
|
||||
case SCALA_ACTOR => dispatchToActor(request, channel)
|
||||
case TYPED_ACTOR => dispatchToTypedActor(request, channel)
|
||||
|
|
@ -937,11 +899,11 @@ class RemoteServerHandler(
|
|||
|
||||
private def dispatchToActor(request: RemoteMessageProtocol, channel: Channel) {
|
||||
val actorInfo = request.getActorInfo
|
||||
log.slf4j.debug("Dispatching to remote actor [{}:{}]", actorInfo.getTarget, actorInfo.getUuid)
|
||||
|
||||
val actorRef =
|
||||
try { createActor(actorInfo, channel).start } catch {
|
||||
case e: SecurityException =>
|
||||
EventHandler notifyListeners EventHandler.Error(e, this)
|
||||
write(channel, createErrorReplyMessage(e, request, AkkaActorType.ScalaActor))
|
||||
server.notifyListeners(RemoteServerError(e, server))
|
||||
return
|
||||
|
|
@ -967,16 +929,13 @@ class RemoteServerHandler(
|
|||
None,
|
||||
Some(new DefaultCompletableFuture[AnyRef](request.getActorInfo.getTimeout).
|
||||
onComplete(f => {
|
||||
log.slf4j.debug("Future was completed, now flushing to remote!")
|
||||
val result = f.result
|
||||
val exception = f.exception
|
||||
|
||||
if (exception.isDefined) {
|
||||
log.slf4j.debug("Returning exception from actor invocation [{}]",exception.get.getClass)
|
||||
write(channel, createErrorReplyMessage(exception.get, request, AkkaActorType.ScalaActor))
|
||||
}
|
||||
else if (result.isDefined) {
|
||||
log.slf4j.debug("Returning result from actor invocation [{}]",result.get)
|
||||
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
|
||||
Some(actorRef),
|
||||
Right(request.getUuid),
|
||||
|
|
@ -1004,7 +963,6 @@ class RemoteServerHandler(
|
|||
private def dispatchToTypedActor(request: RemoteMessageProtocol, channel: Channel) = {
|
||||
val actorInfo = request.getActorInfo
|
||||
val typedActorInfo = actorInfo.getTypedActorInfo
|
||||
log.slf4j.debug("Dispatching to remote typed actor [{} :: {}]", typedActorInfo.getMethod, typedActorInfo.getInterface)
|
||||
|
||||
val typedActor = createTypedActor(actorInfo, channel)
|
||||
val args = MessageSerializer.deserialize(request.getMessage).asInstanceOf[Array[AnyRef]].toList
|
||||
|
|
@ -1031,9 +989,10 @@ class RemoteServerHandler(
|
|||
if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
|
||||
|
||||
write(channel, messageBuilder.build)
|
||||
log.slf4j.debug("Returning result from remote typed actor invocation [{}]", result)
|
||||
} catch {
|
||||
case e: Throwable => server.notifyListeners(RemoteServerError(e, server))
|
||||
case e: Exception =>
|
||||
EventHandler notifyListeners EventHandler.Error(e, this)
|
||||
server.notifyListeners(RemoteServerError(e, server))
|
||||
}
|
||||
|
||||
messageReceiver.invoke(typedActor, args: _*) match {
|
||||
|
|
@ -1048,9 +1007,11 @@ class RemoteServerHandler(
|
|||
}
|
||||
} catch {
|
||||
case e: InvocationTargetException =>
|
||||
EventHandler notifyListeners EventHandler.Error(e, this)
|
||||
write(channel, createErrorReplyMessage(e.getCause, request, AkkaActorType.TypedActor))
|
||||
server.notifyListeners(RemoteServerError(e, server))
|
||||
case e: Throwable =>
|
||||
case e: Exception =>
|
||||
EventHandler notifyListeners EventHandler.Error(e, this)
|
||||
write(channel, createErrorReplyMessage(e, request, AkkaActorType.TypedActor))
|
||||
server.notifyListeners(RemoteServerError(e, server))
|
||||
}
|
||||
|
|
@ -1100,7 +1061,6 @@ class RemoteServerHandler(
|
|||
if (UNTRUSTED_MODE) throw new SecurityException(
|
||||
"Remote server is operating is untrusted mode, can not create remote actors on behalf of the remote client")
|
||||
|
||||
log.slf4j.info("Creating a new client-managed remote actor [{}:{}]", name, uuid)
|
||||
val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name)
|
||||
else Class.forName(name)
|
||||
val actorRef = Actor.actorOf(clazz.asInstanceOf[Class[_ <: Actor]])
|
||||
|
|
@ -1110,8 +1070,8 @@ class RemoteServerHandler(
|
|||
server.actorsByUuid.put(actorRef.uuid.toString, actorRef) // register by uuid
|
||||
actorRef
|
||||
} catch {
|
||||
case e =>
|
||||
log.slf4j.error("Could not create remote actor instance", e)
|
||||
case e: Throwable =>
|
||||
EventHandler notifyListeners EventHandler.Error(e, this)
|
||||
server.notifyListeners(RemoteServerError(e, server))
|
||||
throw e
|
||||
}
|
||||
|
|
@ -1167,8 +1127,6 @@ class RemoteServerHandler(
|
|||
if (UNTRUSTED_MODE) throw new SecurityException(
|
||||
"Remote server is operating is untrusted mode, can not create remote actors on behalf of the remote client")
|
||||
|
||||
log.slf4j.info("Creating a new remote typed actor:\n\t[{} :: {}]", interfaceClassname, targetClassname)
|
||||
|
||||
val (interfaceClass, targetClass) =
|
||||
if (applicationLoader.isDefined) (applicationLoader.get.loadClass(interfaceClassname),
|
||||
applicationLoader.get.loadClass(targetClassname))
|
||||
|
|
@ -1179,8 +1137,8 @@ class RemoteServerHandler(
|
|||
server.typedActors.put(parseUuid(uuid).toString, newInstance) // register by uuid
|
||||
newInstance
|
||||
} catch {
|
||||
case e =>
|
||||
log.slf4j.error("Could not create remote typed actor instance", e)
|
||||
case e: Throwable =>
|
||||
EventHandler notifyListeners EventHandler.Error(e, this)
|
||||
server.notifyListeners(RemoteServerError(e, server))
|
||||
throw e
|
||||
}
|
||||
|
|
@ -1201,8 +1159,6 @@ class RemoteServerHandler(
|
|||
|
||||
private def createErrorReplyMessage(exception: Throwable, request: RemoteMessageProtocol, actorType: AkkaActorType): RemoteMessageProtocol = {
|
||||
val actorInfo = request.getActorInfo
|
||||
log.slf4j.error("Could not invoke remote actor [{}]", actorInfo.getTarget)
|
||||
log.slf4j.debug("Could not invoke remote actor", exception)
|
||||
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
|
||||
None,
|
||||
Right(request.getUuid),
|
||||
|
|
@ -1230,7 +1186,6 @@ class RemoteServerHandler(
|
|||
"The remote client [" + clientAddress + "] does not have a secure cookie.")
|
||||
if (!(request.getCookie == SECURE_COOKIE.get)) throw new SecurityException(
|
||||
"The remote client [" + clientAddress + "] secure cookie is not the same as remote server secure cookie")
|
||||
log.slf4j.info("Remote client [{}] successfully authenticated using secure cookie", clientAddress)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue