Merge branch 'master' into ticket_419
This commit is contained in:
commit
e43d9b2b61
57 changed files with 2776 additions and 1576 deletions
|
|
@ -10,7 +10,7 @@ import java.util.concurrent.{ConcurrentHashMap, Executors}
|
|||
import java.util.{Map => JMap}
|
||||
|
||||
import se.scalablesolutions.akka.actor.{
|
||||
Actor, TypedActor, ActorRef, LocalActorRef, RemoteActorRef, IllegalActorStateException, RemoteActorSystemMessage}
|
||||
Actor, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage}
|
||||
import se.scalablesolutions.akka.actor.Actor._
|
||||
import se.scalablesolutions.akka.util._
|
||||
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._
|
||||
|
|
@ -133,8 +133,8 @@ object RemoteServer {
|
|||
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actors.put(uuid, actor)
|
||||
}
|
||||
|
||||
private[akka] def registerTypedActor(address: InetSocketAddress, name: String, typedActor: AnyRef) = guard.withWriteGuard {
|
||||
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).typedActors.put(name, typedActor)
|
||||
private[akka] def registerTypedActor(address: InetSocketAddress, uuid: String, typedActor: AnyRef) = guard.withWriteGuard {
|
||||
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).typedActors.put(uuid, typedActor)
|
||||
}
|
||||
|
||||
private[akka] def getOrCreateServer(address: InetSocketAddress): RemoteServer = guard.withWriteGuard {
|
||||
|
|
@ -245,12 +245,12 @@ class RemoteServer extends Logging with ListenerManagement {
|
|||
openChannels.add(bootstrap.bind(new InetSocketAddress(hostname, port)))
|
||||
_isRunning = true
|
||||
Cluster.registerLocalNode(hostname, port)
|
||||
foreachListener(_ ! RemoteServerStarted(this))
|
||||
notifyListeners(RemoteServerStarted(this))
|
||||
}
|
||||
} catch {
|
||||
case e =>
|
||||
log.error(e, "Could not start up remote server")
|
||||
foreachListener(_ ! RemoteServerError(e, this))
|
||||
notifyListeners(RemoteServerError(e, this))
|
||||
}
|
||||
this
|
||||
}
|
||||
|
|
@ -263,7 +263,7 @@ class RemoteServer extends Logging with ListenerManagement {
|
|||
openChannels.close.awaitUninterruptibly
|
||||
bootstrap.releaseExternalResources
|
||||
Cluster.deregisterLocalNode(hostname, port)
|
||||
foreachListener(_ ! RemoteServerShutdown(this))
|
||||
notifyListeners(RemoteServerShutdown(this))
|
||||
} catch {
|
||||
case e: java.nio.channels.ClosedChannelException => {}
|
||||
case e => log.warning("Could not close remote server channel in a graceful way")
|
||||
|
|
@ -271,12 +271,28 @@ class RemoteServer extends Logging with ListenerManagement {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: register typed actor in RemoteServer as well
|
||||
/**
|
||||
* Register typed actor by interface name.
|
||||
*/
|
||||
def registerTypedActor(intfClass: Class[_], typedActor: AnyRef) : Unit = registerTypedActor(intfClass.getName, typedActor)
|
||||
|
||||
/**
|
||||
* Register Remote Actor by the Actor's 'uuid' field. It starts the Actor if it is not started already.
|
||||
* Register remote typed actor by a specific id.
|
||||
* @param id custom actor id
|
||||
* @param typedActor typed actor to register
|
||||
*/
|
||||
def register(actorRef: ActorRef): Unit = register(actorRef.id,actorRef)
|
||||
def registerTypedActor(id: String, typedActor: AnyRef): Unit = synchronized {
|
||||
val typedActors = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).typedActors
|
||||
if (!typedActors.contains(id)) {
|
||||
log.debug("Registering server side remote actor [%s] with id [%s] on [%s:%d]", typedActor.getClass.getName, id, hostname, port)
|
||||
typedActors.put(id, typedActor)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Register Remote Actor by the Actor's 'id' field. It starts the Actor if it is not started already.
|
||||
*/
|
||||
def register(actorRef: ActorRef): Unit = register(actorRef.id, actorRef)
|
||||
|
||||
/**
|
||||
* Register Remote Actor by a specific 'id' passed as argument.
|
||||
|
|
@ -321,16 +337,25 @@ class RemoteServer extends Logging with ListenerManagement {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Unregister Remote Typed Actor by specific 'id'.
|
||||
* <p/>
|
||||
* NOTE: You need to call this method if you have registered an actor by a custom ID.
|
||||
*/
|
||||
def unregisterTypedActor(id: String):Unit = synchronized {
|
||||
if (_isRunning) {
|
||||
log.info("Unregistering server side remote typed actor with id [%s]", id)
|
||||
val registeredTypedActors = typedActors()
|
||||
registeredTypedActors.remove(id)
|
||||
}
|
||||
}
|
||||
|
||||
protected override def manageLifeCycleOfListeners = false
|
||||
|
||||
protected[akka] override def foreachListener(f: (ActorRef) => Unit): Unit = super.foreachListener(f)
|
||||
protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message)
|
||||
|
||||
private[akka] def actors() : ConcurrentHashMap[String, ActorRef] = {
|
||||
RemoteServer.actorsFor(address).actors
|
||||
}
|
||||
private[akka] def typedActors() : ConcurrentHashMap[String, AnyRef] = {
|
||||
RemoteServer.actorsFor(address).typedActors
|
||||
}
|
||||
private[akka] def actors() = RemoteServer.actorsFor(address).actors
|
||||
private[akka] def typedActors() = RemoteServer.actorsFor(address).typedActors
|
||||
}
|
||||
|
||||
object RemoteServerSslContext {
|
||||
|
|
@ -413,18 +438,18 @@ class RemoteServerHandler(
|
|||
def operationComplete(future: ChannelFuture): Unit = {
|
||||
if (future.isSuccess) {
|
||||
openChannels.add(future.getChannel)
|
||||
server.foreachListener(_ ! RemoteServerClientConnected(server))
|
||||
server.notifyListeners(RemoteServerClientConnected(server))
|
||||
} else future.getChannel.close
|
||||
}
|
||||
})
|
||||
} else {
|
||||
server.foreachListener(_ ! RemoteServerClientConnected(server))
|
||||
server.notifyListeners(RemoteServerClientConnected(server))
|
||||
}
|
||||
}
|
||||
|
||||
override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
log.debug("Remote client disconnected from [%s]", server.name)
|
||||
server.foreachListener(_ ! RemoteServerClientDisconnected(server))
|
||||
server.notifyListeners(RemoteServerClientDisconnected(server))
|
||||
}
|
||||
|
||||
override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = {
|
||||
|
|
@ -446,7 +471,7 @@ class RemoteServerHandler(
|
|||
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
|
||||
log.error(event.getCause, "Unexpected exception from remote downstream")
|
||||
event.getChannel.close
|
||||
server.foreachListener(_ ! RemoteServerError(event.getCause, server))
|
||||
server.notifyListeners(RemoteServerError(event.getCause, server))
|
||||
}
|
||||
|
||||
private def handleRemoteRequestProtocol(request: RemoteRequestProtocol, channel: Channel) = {
|
||||
|
|
@ -491,7 +516,7 @@ class RemoteServerHandler(
|
|||
} catch {
|
||||
case e: Throwable =>
|
||||
channel.write(createErrorReplyMessage(e, request, true))
|
||||
server.foreachListener(_ ! RemoteServerError(e, server))
|
||||
server.notifyListeners(RemoteServerError(e, server))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -523,13 +548,39 @@ class RemoteServerHandler(
|
|||
} catch {
|
||||
case e: InvocationTargetException =>
|
||||
channel.write(createErrorReplyMessage(e.getCause, request, false))
|
||||
server.foreachListener(_ ! RemoteServerError(e, server))
|
||||
server.notifyListeners(RemoteServerError(e, server))
|
||||
case e: Throwable =>
|
||||
channel.write(createErrorReplyMessage(e, request, false))
|
||||
server.foreachListener(_ ! RemoteServerError(e, server))
|
||||
server.notifyListeners(RemoteServerError(e, server))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Find a registered actor by ID (default) or UUID.
|
||||
* Actors are registered by id apart from registering during serialization see SerializationProtocol.
|
||||
*/
|
||||
private def findActorByIdOrUuid(id: String, uuid: String) : ActorRef = {
|
||||
val registeredActors = server.actors()
|
||||
var actorRefOrNull = registeredActors get id
|
||||
if (actorRefOrNull eq null) {
|
||||
actorRefOrNull = registeredActors get uuid
|
||||
}
|
||||
actorRefOrNull
|
||||
}
|
||||
|
||||
/**
|
||||
* Find a registered typed actor by ID (default) or UUID.
|
||||
* Actors are registered by id apart from registering during serialization see SerializationProtocol.
|
||||
*/
|
||||
private def findTypedActorByIdOrUUid(id: String, uuid: String) : AnyRef = {
|
||||
val registeredActors = server.typedActors()
|
||||
var actorRefOrNull = registeredActors get id
|
||||
if (actorRefOrNull eq null) {
|
||||
actorRefOrNull = registeredActors get uuid
|
||||
}
|
||||
actorRefOrNull
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new instance of the actor with name, uuid and timeout specified as arguments.
|
||||
*
|
||||
|
|
@ -538,12 +589,14 @@ class RemoteServerHandler(
|
|||
* Does not start the actor.
|
||||
*/
|
||||
private def createActor(actorInfo: ActorInfoProtocol): ActorRef = {
|
||||
val uuid = actorInfo.getUuid
|
||||
val ids = actorInfo.getUuid.split(':')
|
||||
val uuid = ids(0)
|
||||
val id = ids(1)
|
||||
|
||||
val name = actorInfo.getTarget
|
||||
val timeout = actorInfo.getTimeout
|
||||
|
||||
val registeredActors = server.actors()
|
||||
val actorRefOrNull = registeredActors get uuid
|
||||
val actorRefOrNull = findActorByIdOrUuid(id, uuid)
|
||||
|
||||
if (actorRefOrNull eq null) {
|
||||
try {
|
||||
|
|
@ -552,23 +605,26 @@ class RemoteServerHandler(
|
|||
else Class.forName(name)
|
||||
val actorRef = Actor.actorOf(clazz.newInstance.asInstanceOf[Actor])
|
||||
actorRef.uuid = uuid
|
||||
actorRef.id = id
|
||||
actorRef.timeout = timeout
|
||||
actorRef.remoteAddress = None
|
||||
registeredActors.put(uuid, actorRef)
|
||||
server.actors.put(id, actorRef) // register by id
|
||||
actorRef
|
||||
} catch {
|
||||
case e =>
|
||||
log.error(e, "Could not create remote actor instance")
|
||||
server.foreachListener(_ ! RemoteServerError(e, server))
|
||||
server.notifyListeners(RemoteServerError(e, server))
|
||||
throw e
|
||||
}
|
||||
} else actorRefOrNull
|
||||
}
|
||||
|
||||
private def createTypedActor(actorInfo: ActorInfoProtocol): AnyRef = {
|
||||
val uuid = actorInfo.getUuid
|
||||
val registeredTypedActors = server.typedActors()
|
||||
val typedActorOrNull = registeredTypedActors get uuid
|
||||
val ids = actorInfo.getUuid.split(':')
|
||||
val uuid = ids(0)
|
||||
val id = ids(1)
|
||||
|
||||
val typedActorOrNull = findTypedActorByIdOrUUid(id, uuid)
|
||||
|
||||
if (typedActorOrNull eq null) {
|
||||
val typedActorInfo = actorInfo.getTypedActorInfo
|
||||
|
|
@ -585,12 +641,12 @@ class RemoteServerHandler(
|
|||
|
||||
val newInstance = TypedActor.newInstance(
|
||||
interfaceClass, targetClass.asInstanceOf[Class[_ <: TypedActor]], actorInfo.getTimeout).asInstanceOf[AnyRef]
|
||||
registeredTypedActors.put(uuid, newInstance)
|
||||
server.typedActors.put(id, newInstance) // register by id
|
||||
newInstance
|
||||
} catch {
|
||||
case e =>
|
||||
log.error(e, "Could not create remote typed actor instance")
|
||||
server.foreachListener(_ ! RemoteServerError(e, server))
|
||||
server.notifyListeners(RemoteServerError(e, server))
|
||||
throw e
|
||||
}
|
||||
} else typedActorOrNull
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue