Merge branch 'master' into mist
This commit is contained in:
commit
dcbdfcc67b
16 changed files with 762 additions and 198 deletions
|
|
@ -139,8 +139,24 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
|
|||
def setFaultHandler(handler: FaultHandlingStrategy)
|
||||
def getFaultHandler(): FaultHandlingStrategy
|
||||
|
||||
@volatile
|
||||
private[akka] var _dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher
|
||||
|
||||
/**
|
||||
* Akka Java API
|
||||
* A lifeCycle defines whether the actor will be stopped on error (Temporary) or if it can be restarted (Permanent)
|
||||
* <p/>
|
||||
* Can be one of:
|
||||
*
|
||||
* import static akka.config.Supervision.*;
|
||||
* <pre>
|
||||
* getContext().setLifeCycle(permanent());
|
||||
* </pre>
|
||||
* Or:
|
||||
* <pre>
|
||||
* getContext().setLifeCycle(temporary());
|
||||
* </pre>
|
||||
*/
|
||||
def setLifeCycle(lifeCycle: LifeCycle): Unit
|
||||
def getLifeCycle(): LifeCycle
|
||||
|
||||
/**
|
||||
* Akka Java API
|
||||
|
|
@ -597,6 +613,8 @@ class LocalActorRef private[akka] (
|
|||
private var restartsWithinTimeRangeTimestamp: Long = 0L
|
||||
@volatile
|
||||
private var _mailbox: AnyRef = _
|
||||
@volatile
|
||||
private[akka] var _dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher
|
||||
|
||||
protected[akka] val actorInstance = guard.withGuard { new AtomicReference[Actor](newActor) }
|
||||
|
||||
|
|
@ -1208,6 +1226,8 @@ private[akka] case class RemoteActorRef private[akka] (
|
|||
|
||||
ensureRemotingEnabled
|
||||
|
||||
val remoteAddress: Option[InetSocketAddress] = Some(new InetSocketAddress(hostname, port))
|
||||
|
||||
id = classOrServiceName
|
||||
timeout = _timeout
|
||||
|
||||
|
|
@ -1242,8 +1262,6 @@ private[akka] case class RemoteActorRef private[akka] (
|
|||
|
||||
protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] = None
|
||||
|
||||
val remoteAddress: Option[InetSocketAddress] = Some(new InetSocketAddress(hostname, port))
|
||||
|
||||
// ==== NOT SUPPORTED ====
|
||||
def actorClass: Class[_ <: Actor] = unsupported
|
||||
def dispatcher_=(md: MessageDispatcher): Unit = unsupported
|
||||
|
|
@ -1313,9 +1331,7 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
|
|||
*/
|
||||
def id: String
|
||||
|
||||
def id_=(id: String): Unit
|
||||
|
||||
/**
|
||||
def id_=(id: String): Unit /**
|
||||
* User overridable callback/setting.
|
||||
* <p/>
|
||||
* Defines the life-cycle for a supervised actor.
|
||||
|
|
|
|||
|
|
@ -37,10 +37,10 @@ case class ActorUnregistered(actor: ActorRef) extends ActorRegistryEvent
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object ActorRegistry extends ListenerManagement {
|
||||
private val actorsByUUID = new ConcurrentHashMap[Uuid, ActorRef]
|
||||
private val actorsById = new Index[String,ActorRef]
|
||||
private val remoteActorSets = Map[Address, RemoteActorSet]()
|
||||
private val guard = new ReadWriteGuard
|
||||
private val actorsByUUID = new ConcurrentHashMap[Uuid, ActorRef]
|
||||
private val actorsById = new Index[String,ActorRef]
|
||||
private val remoteActorSets = Map[Address, RemoteActorSet]()
|
||||
private val guard = new ReadWriteGuard
|
||||
|
||||
/**
|
||||
* Returns all actors in the system.
|
||||
|
|
@ -50,7 +50,7 @@ object ActorRegistry extends ListenerManagement {
|
|||
/**
|
||||
* Returns the number of actors in the system.
|
||||
*/
|
||||
def size : Int = actorsByUUID.size
|
||||
def size: Int = actorsByUUID.size
|
||||
|
||||
/**
|
||||
* Invokes a function for all actors.
|
||||
|
|
@ -68,8 +68,7 @@ object ActorRegistry extends ListenerManagement {
|
|||
val elements = actorsByUUID.elements
|
||||
while (elements.hasMoreElements) {
|
||||
val element = elements.nextElement
|
||||
if(f isDefinedAt element)
|
||||
return Some(f(element))
|
||||
if (f isDefinedAt element) return Some(f(element))
|
||||
}
|
||||
None
|
||||
}
|
||||
|
|
@ -88,9 +87,7 @@ object ActorRegistry extends ListenerManagement {
|
|||
val elements = actorsByUUID.elements
|
||||
while (elements.hasMoreElements) {
|
||||
val actorId = elements.nextElement
|
||||
if (p(actorId)) {
|
||||
all += actorId
|
||||
}
|
||||
if (p(actorId)) all += actorId
|
||||
}
|
||||
all.toArray
|
||||
}
|
||||
|
|
@ -105,7 +102,7 @@ object ActorRegistry extends ListenerManagement {
|
|||
* Finds any actor that matches T.
|
||||
*/
|
||||
def actorFor[T <: Actor](implicit manifest: Manifest[T]): Option[ActorRef] =
|
||||
find({ case a:ActorRef if manifest.erasure.isAssignableFrom(a.actor.getClass) => a })
|
||||
find({ case a: ActorRef if manifest.erasure.isAssignableFrom(a.actor.getClass) => a })
|
||||
|
||||
/**
|
||||
* Finds all actors of type or sub-type specified by the class passed in as the Class argument.
|
||||
|
|
@ -132,13 +129,11 @@ object ActorRegistry extends ListenerManagement {
|
|||
* Invokes a function for all typed actors.
|
||||
*/
|
||||
def foreachTypedActor(f: (AnyRef) => Unit) = {
|
||||
TypedActorModule.ensureTypedActorEnabled
|
||||
TypedActorModule.ensureEnabled
|
||||
val elements = actorsByUUID.elements
|
||||
while (elements.hasMoreElements) {
|
||||
val proxy = typedActorFor(elements.nextElement)
|
||||
if (proxy.isDefined) {
|
||||
f(proxy.get)
|
||||
}
|
||||
if (proxy.isDefined) f(proxy.get)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -147,12 +142,11 @@ object ActorRegistry extends ListenerManagement {
|
|||
* Returns None if the function never returns Some
|
||||
*/
|
||||
def findTypedActor[T](f: PartialFunction[AnyRef,T]) : Option[T] = {
|
||||
TypedActorModule.ensureTypedActorEnabled
|
||||
TypedActorModule.ensureEnabled
|
||||
val elements = actorsByUUID.elements
|
||||
while (elements.hasMoreElements) {
|
||||
val proxy = typedActorFor(elements.nextElement)
|
||||
if(proxy.isDefined && (f isDefinedAt proxy))
|
||||
return Some(f(proxy))
|
||||
if (proxy.isDefined && (f isDefinedAt proxy)) return Some(f(proxy))
|
||||
}
|
||||
None
|
||||
}
|
||||
|
|
@ -161,14 +155,12 @@ object ActorRegistry extends ListenerManagement {
|
|||
* Finds all typed actors that satisfy a predicate.
|
||||
*/
|
||||
def filterTypedActors(p: AnyRef => Boolean): Array[AnyRef] = {
|
||||
TypedActorModule.ensureTypedActorEnabled
|
||||
TypedActorModule.ensureEnabled
|
||||
val all = new ListBuffer[AnyRef]
|
||||
val elements = actorsByUUID.elements
|
||||
while (elements.hasMoreElements) {
|
||||
val proxy = typedActorFor(elements.nextElement)
|
||||
if (proxy.isDefined && p(proxy.get)) {
|
||||
all += proxy.get
|
||||
}
|
||||
if (proxy.isDefined && p(proxy.get)) all += proxy.get
|
||||
}
|
||||
all.toArray
|
||||
}
|
||||
|
|
@ -177,7 +169,7 @@ object ActorRegistry extends ListenerManagement {
|
|||
* Finds all typed actors that are subtypes of the class passed in as the Manifest argument.
|
||||
*/
|
||||
def typedActorsFor[T <: AnyRef](implicit manifest: Manifest[T]): Array[AnyRef] = {
|
||||
TypedActorModule.ensureTypedActorEnabled
|
||||
TypedActorModule.ensureEnabled
|
||||
typedActorsFor[T](manifest.erasure.asInstanceOf[Class[T]])
|
||||
}
|
||||
|
||||
|
|
@ -185,20 +177,20 @@ object ActorRegistry extends ListenerManagement {
|
|||
* Finds any typed actor that matches T.
|
||||
*/
|
||||
def typedActorFor[T <: AnyRef](implicit manifest: Manifest[T]): Option[AnyRef] = {
|
||||
TypedActorModule.ensureTypedActorEnabled
|
||||
def predicate(proxy: AnyRef) : Boolean = {
|
||||
TypedActorModule.ensureEnabled
|
||||
def predicate(proxy: AnyRef): Boolean = {
|
||||
val actorRef = TypedActorModule.typedActorObjectInstance.get.actorFor(proxy)
|
||||
actorRef.isDefined && manifest.erasure.isAssignableFrom(actorRef.get.actor.getClass)
|
||||
}
|
||||
findTypedActor({ case a:AnyRef if predicate(a) => a })
|
||||
findTypedActor({ case a: Some[AnyRef] if predicate(a.get) => a })
|
||||
}
|
||||
|
||||
/**
|
||||
* Finds all typed actors of type or sub-type specified by the class passed in as the Class argument.
|
||||
*/
|
||||
def typedActorsFor[T <: AnyRef](clazz: Class[T]): Array[AnyRef] = {
|
||||
TypedActorModule.ensureTypedActorEnabled
|
||||
def predicate(proxy: AnyRef) : Boolean = {
|
||||
TypedActorModule.ensureEnabled
|
||||
def predicate(proxy: AnyRef): Boolean = {
|
||||
val actorRef = TypedActorModule.typedActorObjectInstance.get.actorFor(proxy)
|
||||
actorRef.isDefined && clazz.isAssignableFrom(actorRef.get.actor.getClass)
|
||||
}
|
||||
|
|
@ -209,7 +201,7 @@ object ActorRegistry extends ListenerManagement {
|
|||
* Finds all typed actors that have a specific id.
|
||||
*/
|
||||
def typedActorsFor(id: String): Array[AnyRef] = {
|
||||
TypedActorModule.ensureTypedActorEnabled
|
||||
TypedActorModule.ensureEnabled
|
||||
val actorRefs = actorsById values id
|
||||
actorRefs.flatMap(typedActorFor(_))
|
||||
}
|
||||
|
|
@ -218,12 +210,10 @@ object ActorRegistry extends ListenerManagement {
|
|||
* Finds the typed actor that has a specific UUID.
|
||||
*/
|
||||
def typedActorFor(uuid: Uuid): Option[AnyRef] = {
|
||||
TypedActorModule.ensureTypedActorEnabled
|
||||
TypedActorModule.ensureEnabled
|
||||
val actorRef = actorsByUUID get uuid
|
||||
if (actorRef eq null)
|
||||
None
|
||||
else
|
||||
typedActorFor(actorRef)
|
||||
if (actorRef eq null) None
|
||||
else typedActorFor(actorRef)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -265,20 +255,15 @@ object ActorRegistry extends ListenerManagement {
|
|||
*/
|
||||
def shutdownAll() {
|
||||
log.info("Shutting down all actors in the system...")
|
||||
if (TypedActorModule.isTypedActorEnabled) {
|
||||
if (TypedActorModule.isEnabled) {
|
||||
val elements = actorsByUUID.elements
|
||||
while (elements.hasMoreElements) {
|
||||
val actorRef = elements.nextElement
|
||||
val proxy = typedActorFor(actorRef)
|
||||
if (proxy.isDefined) {
|
||||
TypedActorModule.typedActorObjectInstance.get.stop(proxy.get)
|
||||
} else {
|
||||
actorRef.stop
|
||||
}
|
||||
if (proxy.isDefined) TypedActorModule.typedActorObjectInstance.get.stop(proxy.get)
|
||||
else actorRef.stop
|
||||
}
|
||||
} else {
|
||||
foreach(_.stop)
|
||||
}
|
||||
} else foreach(_.stop)
|
||||
actorsByUUID.clear
|
||||
actorsById.clear
|
||||
log.info("All actors have been shut down and unregistered from ActorRegistry")
|
||||
|
|
@ -301,14 +286,18 @@ object ActorRegistry extends ListenerManagement {
|
|||
|
||||
private[akka] def actors(address: Address) = actorsFor(address).actors
|
||||
private[akka] def actorsByUuid(address: Address) = actorsFor(address).actorsByUuid
|
||||
private[akka] def actorsFactories(address: Address) = actorsFor(address).actorsFactories
|
||||
private[akka] def typedActors(address: Address) = actorsFor(address).typedActors
|
||||
private[akka] def typedActorsByUuid(address: Address) = actorsFor(address).typedActorsByUuid
|
||||
private[akka] def typedActorsFactories(address: Address) = actorsFor(address).typedActorsFactories
|
||||
|
||||
private[akka] class RemoteActorSet {
|
||||
private[ActorRegistry] val actors = new ConcurrentHashMap[String, ActorRef]
|
||||
private[ActorRegistry] val actorsByUuid = new ConcurrentHashMap[String, ActorRef]
|
||||
private[ActorRegistry] val actorsFactories = new ConcurrentHashMap[String, () => ActorRef]
|
||||
private[ActorRegistry] val typedActors = new ConcurrentHashMap[String, AnyRef]
|
||||
private[ActorRegistry] val typedActorsByUuid = new ConcurrentHashMap[String, AnyRef]
|
||||
private[ActorRegistry] val typedActorsFactories = new ConcurrentHashMap[String, () => AnyRef]
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -337,16 +326,13 @@ class Index[K <: AnyRef,V <: AnyRef : Manifest] {
|
|||
|
||||
if (set ne null) {
|
||||
set.synchronized {
|
||||
if (set.isEmpty) {
|
||||
retry = true //IF the set is empty then it has been removed, so signal retry
|
||||
}
|
||||
if (set.isEmpty) retry = true //IF the set is empty then it has been removed, so signal retry
|
||||
else { //Else add the value to the set and signal that retry is not needed
|
||||
added = set add v
|
||||
retry = false
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
val newSet = new ConcurrentSkipListSet[V]
|
||||
newSet add v
|
||||
|
||||
|
|
@ -354,24 +340,20 @@ class Index[K <: AnyRef,V <: AnyRef : Manifest] {
|
|||
val oldSet = container.putIfAbsent(k,newSet)
|
||||
if (oldSet ne null) {
|
||||
oldSet.synchronized {
|
||||
if (oldSet.isEmpty) {
|
||||
retry = true //IF the set is empty then it has been removed, so signal retry
|
||||
}
|
||||
if (oldSet.isEmpty) retry = true //IF the set is empty then it has been removed, so signal retry
|
||||
else { //Else try to add the value to the set and signal that retry is not needed
|
||||
added = oldSet add v
|
||||
retry = false
|
||||
}
|
||||
}
|
||||
} else {
|
||||
added = true
|
||||
}
|
||||
} else added = true
|
||||
}
|
||||
|
||||
if (retry) spinPut(k,v)
|
||||
if (retry) spinPut(k, v)
|
||||
else added
|
||||
}
|
||||
|
||||
spinPut(key,value)
|
||||
spinPut(key, value)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -390,10 +372,8 @@ class Index[K <: AnyRef,V <: AnyRef : Manifest] {
|
|||
def findValue(key: K)(f: (V) => Boolean): Option[V] = {
|
||||
import scala.collection.JavaConversions._
|
||||
val set = container get key
|
||||
if (set ne null)
|
||||
set.iterator.find(f)
|
||||
else
|
||||
None
|
||||
if (set ne null) set.iterator.find(f)
|
||||
else None
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -420,8 +400,7 @@ class Index[K <: AnyRef,V <: AnyRef : Manifest] {
|
|||
container.remove(key,emptySet) //We try to remove the key if it's mapped to an empty set
|
||||
|
||||
true //Remove succeeded
|
||||
}
|
||||
else false //Remove failed
|
||||
} else false //Remove failed
|
||||
}
|
||||
} else false //Remove failed
|
||||
}
|
||||
|
|
@ -434,5 +413,5 @@ class Index[K <: AnyRef,V <: AnyRef : Manifest] {
|
|||
/**
|
||||
* Removes all keys and all values
|
||||
*/
|
||||
def clear = foreach { case (k,v) => remove(k,v) }
|
||||
def clear = foreach { case (k, v) => remove(k, v) }
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@
|
|||
package akka.dispatch
|
||||
|
||||
import akka.actor.{ActorRef, IllegalActorStateException}
|
||||
import akka.util.ReflectiveAccess.EnterpriseModule
|
||||
import akka.util.ReflectiveAccess.AkkaCloudModule
|
||||
|
||||
import java.util.Queue
|
||||
import akka.util.Switch
|
||||
|
|
@ -123,10 +123,10 @@ class ExecutorBasedEventDrivenDispatcher(
|
|||
*/
|
||||
def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef = mailboxType match {
|
||||
// FIXME make generic (work for TypedActor as well)
|
||||
case FileBasedDurableMailbox(serializer) => EnterpriseModule.createFileBasedMailbox(actorRef).asInstanceOf[MessageQueue]
|
||||
case ZooKeeperBasedDurableMailbox(serializer) => EnterpriseModule.createZooKeeperBasedMailbox(actorRef).asInstanceOf[MessageQueue]
|
||||
case BeanstalkBasedDurableMailbox(serializer) => EnterpriseModule.createBeanstalkBasedMailbox(actorRef).asInstanceOf[MessageQueue]
|
||||
case RedisBasedDurableMailbox(serializer) => EnterpriseModule.createRedisBasedMailbox(actorRef).asInstanceOf[MessageQueue]
|
||||
case FileBasedDurableMailbox(serializer) => AkkaCloudModule.createFileBasedMailbox(actorRef).asInstanceOf[MessageQueue]
|
||||
case ZooKeeperBasedDurableMailbox(serializer) => AkkaCloudModule.createZooKeeperBasedMailbox(actorRef).asInstanceOf[MessageQueue]
|
||||
case BeanstalkBasedDurableMailbox(serializer) => AkkaCloudModule.createBeanstalkBasedMailbox(actorRef).asInstanceOf[MessageQueue]
|
||||
case RedisBasedDurableMailbox(serializer) => AkkaCloudModule.createRedisBasedMailbox(actorRef).asInstanceOf[MessageQueue]
|
||||
case AMQPBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("AMQPBasedDurableMailbox is not yet supported")
|
||||
case JMSBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("JMSBasedDurableMailbox is not yet supported")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@
|
|||
package akka.dispatch
|
||||
|
||||
import akka.actor.{Actor, ActorType, ActorRef, ActorInitializationException}
|
||||
import akka.util.ReflectiveAccess.EnterpriseModule
|
||||
import akka.util.ReflectiveAccess.AkkaCloudModule
|
||||
import akka.AkkaException
|
||||
|
||||
import java.util.{Queue, List}
|
||||
|
|
@ -42,15 +42,15 @@ case class BoundedMailbox(
|
|||
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
|
||||
}
|
||||
|
||||
abstract class DurableMailboxType(val serializer: EnterpriseModule.Serializer) extends MailboxType {
|
||||
abstract class DurableMailboxType(val serializer: AkkaCloudModule.Serializer) extends MailboxType {
|
||||
if (serializer eq null) throw new IllegalArgumentException("The serializer for DurableMailboxType can not be null")
|
||||
}
|
||||
case class FileBasedDurableMailbox(ser: EnterpriseModule.Serializer) extends DurableMailboxType(ser)
|
||||
case class RedisBasedDurableMailbox(ser: EnterpriseModule.Serializer) extends DurableMailboxType(ser)
|
||||
case class BeanstalkBasedDurableMailbox(ser: EnterpriseModule.Serializer) extends DurableMailboxType(ser)
|
||||
case class ZooKeeperBasedDurableMailbox(ser: EnterpriseModule.Serializer) extends DurableMailboxType(ser)
|
||||
case class AMQPBasedDurableMailbox(ser: EnterpriseModule.Serializer) extends DurableMailboxType(ser)
|
||||
case class JMSBasedDurableMailbox(ser: EnterpriseModule.Serializer) extends DurableMailboxType(ser)
|
||||
case class FileBasedDurableMailbox(ser: AkkaCloudModule.Serializer) extends DurableMailboxType(ser)
|
||||
case class RedisBasedDurableMailbox(ser: AkkaCloudModule.Serializer) extends DurableMailboxType(ser)
|
||||
case class BeanstalkBasedDurableMailbox(ser: AkkaCloudModule.Serializer) extends DurableMailboxType(ser)
|
||||
case class ZooKeeperBasedDurableMailbox(ser: AkkaCloudModule.Serializer) extends DurableMailboxType(ser)
|
||||
case class AMQPBasedDurableMailbox(ser: AkkaCloudModule.Serializer) extends DurableMailboxType(ser)
|
||||
case class JMSBasedDurableMailbox(ser: AkkaCloudModule.Serializer) extends DurableMailboxType(ser)
|
||||
|
||||
class DefaultUnboundedMessageQueue(blockDequeue: Boolean)
|
||||
extends LinkedBlockingQueue[MessageInvocation] with MessageQueue {
|
||||
|
|
|
|||
|
|
@ -20,13 +20,13 @@ object ReflectiveAccess extends Logging {
|
|||
|
||||
val loader = getClass.getClassLoader
|
||||
|
||||
lazy val isRemotingEnabled = RemoteClientModule.isRemotingEnabled
|
||||
lazy val isTypedActorEnabled = TypedActorModule.isTypedActorEnabled
|
||||
lazy val isEnterpriseEnabled = EnterpriseModule.isEnterpriseEnabled
|
||||
lazy val isRemotingEnabled = RemoteClientModule.isEnabled
|
||||
lazy val isTypedActorEnabled = TypedActorModule.isEnabled
|
||||
lazy val isAkkaCloudEnabled = AkkaCloudModule.isEnabled
|
||||
|
||||
def ensureRemotingEnabled = RemoteClientModule.ensureRemotingEnabled
|
||||
def ensureTypedActorEnabled = TypedActorModule.ensureTypedActorEnabled
|
||||
def ensureEnterpriseEnabled = EnterpriseModule.ensureEnterpriseEnabled
|
||||
def ensureRemotingEnabled = RemoteClientModule.ensureEnabled
|
||||
def ensureTypedActorEnabled = TypedActorModule.ensureEnabled
|
||||
def ensureAkkaCloudEnabled = AkkaCloudModule.ensureEnabled
|
||||
|
||||
/**
|
||||
* Reflective access to the RemoteClient module.
|
||||
|
|
@ -56,32 +56,32 @@ object ReflectiveAccess extends Logging {
|
|||
def clientFor(hostname: String, port: Int, loader: Option[ClassLoader]): RemoteClient
|
||||
}
|
||||
|
||||
lazy val isRemotingEnabled = remoteClientObjectInstance.isDefined
|
||||
lazy val isEnabled = remoteClientObjectInstance.isDefined
|
||||
|
||||
def ensureRemotingEnabled = if (!isRemotingEnabled) throw new ModuleNotAvailableException(
|
||||
def ensureEnabled = if (!isEnabled) throw new ModuleNotAvailableException(
|
||||
"Can't load the remoting module, make sure that akka-remote.jar is on the classpath")
|
||||
|
||||
val remoteClientObjectInstance: Option[RemoteClientObject] =
|
||||
getObjectFor("akka.remote.RemoteClient$")
|
||||
|
||||
def register(address: InetSocketAddress, uuid: Uuid) = {
|
||||
ensureRemotingEnabled
|
||||
ensureEnabled
|
||||
remoteClientObjectInstance.get.register(address.getHostName, address.getPort, uuid)
|
||||
}
|
||||
|
||||
def unregister(address: InetSocketAddress, uuid: Uuid) = {
|
||||
ensureRemotingEnabled
|
||||
ensureEnabled
|
||||
remoteClientObjectInstance.get.unregister(address.getHostName, address.getPort, uuid)
|
||||
}
|
||||
|
||||
def registerSupervisorForActor(remoteAddress: InetSocketAddress, actorRef: ActorRef) = {
|
||||
ensureRemotingEnabled
|
||||
ensureEnabled
|
||||
val remoteClient = remoteClientObjectInstance.get.clientFor(remoteAddress)
|
||||
remoteClient.registerSupervisorForActor(actorRef)
|
||||
}
|
||||
|
||||
def clientFor(hostname: String, port: Int, loader: Option[ClassLoader]): RemoteClient = {
|
||||
ensureRemotingEnabled
|
||||
ensureEnabled
|
||||
remoteClientObjectInstance.get.clientFor(hostname, port, loader)
|
||||
}
|
||||
|
||||
|
|
@ -95,7 +95,7 @@ object ReflectiveAccess extends Logging {
|
|||
actorRef: ActorRef,
|
||||
typedActorInfo: Option[Tuple2[String, String]],
|
||||
actorType: ActorType): Option[CompletableFuture[T]] = {
|
||||
ensureRemotingEnabled
|
||||
ensureEnabled
|
||||
clientFor(remoteAddress.getHostName, remoteAddress.getPort, None).send[T](
|
||||
message, senderOption, senderFuture, remoteAddress, timeout, isOneWay, actorRef, typedActorInfo, actorType)
|
||||
}
|
||||
|
|
@ -126,17 +126,17 @@ object ReflectiveAccess extends Logging {
|
|||
getObjectFor("akka.remote.RemoteNode$")
|
||||
|
||||
def registerActor(address: InetSocketAddress, actorRef: ActorRef) = {
|
||||
ensureRemotingEnabled
|
||||
RemoteClientModule.ensureEnabled
|
||||
remoteServerObjectInstance.get.registerActor(address, actorRef)
|
||||
}
|
||||
|
||||
def registerTypedActor(address: InetSocketAddress, implementationClassName: String, proxy: AnyRef) = {
|
||||
ensureRemotingEnabled
|
||||
RemoteClientModule.ensureEnabled
|
||||
remoteServerObjectInstance.get.registerTypedActor(address, implementationClassName, proxy)
|
||||
}
|
||||
|
||||
def unregister(actorRef: ActorRef) = {
|
||||
ensureRemotingEnabled
|
||||
RemoteClientModule.ensureEnabled
|
||||
remoteNodeObjectInstance.get.unregister(actorRef)
|
||||
}
|
||||
}
|
||||
|
|
@ -156,16 +156,16 @@ object ReflectiveAccess extends Logging {
|
|||
def stop(anyRef: AnyRef) : Unit
|
||||
}
|
||||
|
||||
lazy val isTypedActorEnabled = typedActorObjectInstance.isDefined
|
||||
lazy val isEnabled = typedActorObjectInstance.isDefined
|
||||
|
||||
def ensureTypedActorEnabled = if (!isTypedActorEnabled) throw new ModuleNotAvailableException(
|
||||
def ensureEnabled = if (!isTypedActorEnabled) throw new ModuleNotAvailableException(
|
||||
"Can't load the typed actor module, make sure that akka-typed-actor.jar is on the classpath")
|
||||
|
||||
val typedActorObjectInstance: Option[TypedActorObject] =
|
||||
getObjectFor("akka.actor.TypedActor$")
|
||||
|
||||
def resolveFutureIfMessageIsJoinPoint(message: Any, future: Future[_]): Boolean = {
|
||||
ensureTypedActorEnabled
|
||||
ensureEnabled
|
||||
if (typedActorObjectInstance.get.isJoinPointAndOneWay(message)) {
|
||||
future.asInstanceOf[CompletableFuture[Option[_]]].completeWithResult(None)
|
||||
}
|
||||
|
|
@ -173,7 +173,7 @@ object ReflectiveAccess extends Logging {
|
|||
}
|
||||
}
|
||||
|
||||
object EnterpriseModule {
|
||||
object AkkaCloudModule {
|
||||
|
||||
type Mailbox = {
|
||||
def enqueue(message: MessageInvocation)
|
||||
|
|
@ -185,27 +185,27 @@ object ReflectiveAccess extends Logging {
|
|||
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef
|
||||
}
|
||||
|
||||
lazy val isEnterpriseEnabled = clusterObjectInstance.isDefined
|
||||
lazy val isEnabled = clusterObjectInstance.isDefined
|
||||
|
||||
val clusterObjectInstance: Option[AnyRef] =
|
||||
getObjectFor("akka.cluster.Cluster$")
|
||||
getObjectFor("akka.cloud.cluster.Cluster$")
|
||||
|
||||
val serializerClass: Option[Class[_]] =
|
||||
getClassFor("akka.serialization.Serializer")
|
||||
|
||||
def ensureEnterpriseEnabled = if (!isEnterpriseEnabled) throw new ModuleNotAvailableException(
|
||||
"Feature is only available in Akka Enterprise edition")
|
||||
def ensureEnabled = if (!isEnabled) throw new ModuleNotAvailableException(
|
||||
"Feature is only available in Akka Cloud")
|
||||
|
||||
def createFileBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.actor.mailbox.FileBasedMailbox", actorRef)
|
||||
def createFileBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.cloud.cluster.FileBasedMailbox", actorRef)
|
||||
|
||||
def createZooKeeperBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.actor.mailbox.ZooKeeperBasedMailbox", actorRef)
|
||||
def createZooKeeperBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.cloud.cluster.ZooKeeperBasedMailbox", actorRef)
|
||||
|
||||
def createBeanstalkBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.actor.mailbox.BeanstalkBasedMailbox", actorRef)
|
||||
def createBeanstalkBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.cloud.cluster.BeanstalkBasedMailbox", actorRef)
|
||||
|
||||
def createRedisBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.actor.mailbox.RedisBasedMailbox", actorRef)
|
||||
def createRedisBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.cloud.cluster.RedisBasedMailbox", actorRef)
|
||||
|
||||
private def createMailbox(mailboxClassname: String, actorRef: ActorRef): Mailbox = {
|
||||
ensureEnterpriseEnabled
|
||||
ensureEnabled
|
||||
createInstance(
|
||||
mailboxClassname,
|
||||
Array(classOf[ActorRef]),
|
||||
|
|
|
|||
|
|
@ -27,7 +27,6 @@ object ActorRegistrySpec {
|
|||
self.reply("got ping")
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class ActorRegistrySpec extends JUnitSuite {
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@ trait EmbeddedVoldemort extends BeforeAndAfterAll with Logging {
|
|||
override protected def beforeAll(): Unit = {
|
||||
|
||||
try {
|
||||
val dir = "./akka-persistence/akka-persistence-voldemort/target/scala_2.8.0/test-resources"
|
||||
val dir = "./akka-persistence/akka-persistence-voldemort/target/test-resources"
|
||||
val home = new File(dir)
|
||||
log.info("Creating Voldemort Config")
|
||||
val config = VoldemortConfig.loadFromVoldemortHome(home.getCanonicalPath)
|
||||
|
|
|
|||
|
|
@ -284,6 +284,21 @@ class RemoteServer extends Logging with ListenerManagement {
|
|||
else registerTypedActor(id, typedActor, typedActors)
|
||||
}
|
||||
|
||||
/**
|
||||
* Register typed actor by interface name.
|
||||
*/
|
||||
def registerTypedPerSessionActor(intfClass: Class[_], factory: => AnyRef) : Unit = registerTypedActor(intfClass.getName, factory)
|
||||
|
||||
/**
|
||||
* Register remote typed actor by a specific id.
|
||||
* @param id custom actor id
|
||||
* @param typedActor typed actor to register
|
||||
*/
|
||||
def registerTypedPerSessionActor(id: String, factory: => AnyRef): Unit = synchronized {
|
||||
log.debug("Registering server side typed remote session actor with id [%s]", id)
|
||||
registerTypedPerSessionActor(id, () => factory, typedActorsFactories)
|
||||
}
|
||||
|
||||
/**
|
||||
* Register Remote Actor by the Actor's 'id' field. It starts the Actor if it is not started already.
|
||||
*/
|
||||
|
|
@ -300,15 +315,36 @@ class RemoteServer extends Logging with ListenerManagement {
|
|||
else register(id, actorRef, actors)
|
||||
}
|
||||
|
||||
/**
|
||||
* Register Remote Session Actor by a specific 'id' passed as argument.
|
||||
* <p/>
|
||||
* NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself.
|
||||
*/
|
||||
def registerPerSession(id: String, factory: => ActorRef): Unit = synchronized {
|
||||
log.debug("Registering server side remote session actor with id [%s]", id)
|
||||
registerPerSession(id, () => factory, actorsFactories)
|
||||
}
|
||||
|
||||
private def register[Key](id: Key, actorRef: ActorRef, registry: ConcurrentHashMap[Key, ActorRef]) {
|
||||
if (_isRunning && !registry.contains(id)) {
|
||||
if (_isRunning) {
|
||||
registry.put(id, actorRef) //TODO change to putIfAbsent
|
||||
if (!actorRef.isRunning) actorRef.start
|
||||
registry.put(id, actorRef)
|
||||
}
|
||||
}
|
||||
|
||||
private def registerPerSession[Key](id: Key, factory: () => ActorRef, registry: ConcurrentHashMap[Key,() => ActorRef]) {
|
||||
if (_isRunning)
|
||||
registry.put(id, factory) //TODO change to putIfAbsent
|
||||
}
|
||||
|
||||
private def registerTypedActor[Key](id: Key, typedActor: AnyRef, registry: ConcurrentHashMap[Key, AnyRef]) {
|
||||
if (_isRunning && !registry.contains(id)) registry.put(id, typedActor)
|
||||
if (_isRunning)
|
||||
registry.put(id, typedActor) //TODO change to putIfAbsent
|
||||
}
|
||||
|
||||
private def registerTypedPerSessionActor[Key](id: Key, factory: () => AnyRef, registry: ConcurrentHashMap[Key,() => AnyRef]) {
|
||||
if (_isRunning)
|
||||
registry.put(id, factory) //TODO change to putIfAbsent
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -339,6 +375,18 @@ class RemoteServer extends Logging with ListenerManagement {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Unregister Remote Actor by specific 'id'.
|
||||
* <p/>
|
||||
* NOTE: You need to call this method if you have registered an actor by a custom ID.
|
||||
*/
|
||||
def unregisterPerSession(id: String):Unit = {
|
||||
if (_isRunning) {
|
||||
log.info("Unregistering server side remote session actor with id [%s]", id)
|
||||
actorsFactories.remove(id)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Unregister Remote Typed Actor by specific 'id'.
|
||||
* <p/>
|
||||
|
|
@ -352,14 +400,28 @@ class RemoteServer extends Logging with ListenerManagement {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Unregister Remote Typed Actor by specific 'id'.
|
||||
* <p/>
|
||||
* NOTE: You need to call this method if you have registered an actor by a custom ID.
|
||||
*/
|
||||
def unregisterTypedPerSessionActor(id: String):Unit = {
|
||||
if (_isRunning) {
|
||||
typedActorsFactories.remove(id)
|
||||
}
|
||||
}
|
||||
|
||||
protected override def manageLifeCycleOfListeners = false
|
||||
|
||||
protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message)
|
||||
|
||||
|
||||
private[akka] def actors = ActorRegistry.actors(address)
|
||||
private[akka] def actorsByUuid = ActorRegistry.actorsByUuid(address)
|
||||
private[akka] def actorsFactories = ActorRegistry.actorsFactories(address)
|
||||
private[akka] def typedActors = ActorRegistry.typedActors(address)
|
||||
private[akka] def typedActorsByUuid = ActorRegistry.typedActorsByUuid(address)
|
||||
private[akka] def typedActorsFactories = ActorRegistry.typedActorsFactories(address)
|
||||
}
|
||||
|
||||
object RemoteServerSslContext {
|
||||
|
|
@ -427,6 +489,9 @@ class RemoteServerHandler(
|
|||
val AW_PROXY_PREFIX = "$$ProxiedByAW".intern
|
||||
val CHANNEL_INIT = "channel-init".intern
|
||||
|
||||
val sessionActors = new ChannelLocal[ConcurrentHashMap[String, ActorRef]]()
|
||||
val typedSessionActors = new ChannelLocal[ConcurrentHashMap[String, AnyRef]]()
|
||||
|
||||
applicationLoader.foreach(MessageSerializer.setClassLoader(_))
|
||||
|
||||
/**
|
||||
|
|
@ -437,6 +502,8 @@ class RemoteServerHandler(
|
|||
|
||||
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
val clientAddress = getClientAddress(ctx)
|
||||
sessionActors.set(event.getChannel(), new ConcurrentHashMap[String, ActorRef]())
|
||||
typedSessionActors.set(event.getChannel(), new ConcurrentHashMap[String, AnyRef]())
|
||||
log.debug("Remote client [%s] connected to [%s]", clientAddress, server.name)
|
||||
if (RemoteServer.SECURE) {
|
||||
val sslHandler: SslHandler = ctx.getPipeline.get(classOf[SslHandler])
|
||||
|
|
@ -456,6 +523,22 @@ class RemoteServerHandler(
|
|||
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
val clientAddress = getClientAddress(ctx)
|
||||
log.debug("Remote client [%s] disconnected from [%s]", clientAddress, server.name)
|
||||
// stop all session actors
|
||||
val channelActors = sessionActors.remove(event.getChannel)
|
||||
if (channelActors ne null) {
|
||||
val channelActorsIterator = channelActors.elements
|
||||
while (channelActorsIterator.hasMoreElements) {
|
||||
channelActorsIterator.nextElement.stop
|
||||
}
|
||||
}
|
||||
|
||||
val channelTypedActors = typedSessionActors.remove(event.getChannel)
|
||||
if (channelTypedActors ne null) {
|
||||
val channelTypedActorsIterator = channelTypedActors.elements
|
||||
while (channelTypedActorsIterator.hasMoreElements) {
|
||||
TypedActor.stop(channelTypedActorsIterator.nextElement)
|
||||
}
|
||||
}
|
||||
server.notifyListeners(RemoteServerClientDisconnected(server, clientAddress))
|
||||
}
|
||||
|
||||
|
|
@ -510,7 +593,7 @@ class RemoteServerHandler(
|
|||
|
||||
val actorRef =
|
||||
try {
|
||||
createActor(actorInfo).start
|
||||
createActor(actorInfo, channel).start
|
||||
} catch {
|
||||
case e: SecurityException =>
|
||||
channel.write(createErrorReplyMessage(e, request, AkkaActorType.ScalaActor))
|
||||
|
|
@ -542,7 +625,7 @@ class RemoteServerHandler(
|
|||
val exception = f.exception
|
||||
|
||||
if (exception.isDefined) {
|
||||
log.debug("Returning exception from actor invocation [%s]".format(exception.get))
|
||||
log.debug("Returning exception from actor invocation [%s]",exception.get)
|
||||
try {
|
||||
channel.write(createErrorReplyMessage(exception.get, request, AkkaActorType.ScalaActor))
|
||||
} catch {
|
||||
|
|
@ -584,7 +667,7 @@ class RemoteServerHandler(
|
|||
val typedActorInfo = actorInfo.getTypedActorInfo
|
||||
log.debug("Dispatching to remote typed actor [%s :: %s]", typedActorInfo.getMethod, typedActorInfo.getInterface)
|
||||
|
||||
val typedActor = createTypedActor(actorInfo)
|
||||
val typedActor = createTypedActor(actorInfo, channel)
|
||||
val args = MessageSerializer.deserialize(request.getMessage).asInstanceOf[Array[AnyRef]].toList
|
||||
val argClasses = args.map(_.getClass)
|
||||
|
||||
|
|
@ -592,26 +675,35 @@ class RemoteServerHandler(
|
|||
val messageReceiver = typedActor.getClass.getDeclaredMethod(typedActorInfo.getMethod, argClasses: _*)
|
||||
if (request.getOneWay) messageReceiver.invoke(typedActor, args: _*)
|
||||
else {
|
||||
val result = messageReceiver.invoke(typedActor, args: _*) match {
|
||||
case f: Future[_] => f.await.result.get //TODO replace this with a Listener on the Future to avoid blocking
|
||||
case other => other
|
||||
//Sends the response
|
||||
def sendResponse(result: Either[Any,Throwable]): Unit = try {
|
||||
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
|
||||
None,
|
||||
Right(request.getUuid),
|
||||
actorInfo.getId,
|
||||
actorInfo.getTarget,
|
||||
actorInfo.getTimeout,
|
||||
result,
|
||||
true,
|
||||
None,
|
||||
None,
|
||||
AkkaActorType.TypedActor,
|
||||
None)
|
||||
if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
|
||||
channel.write(messageBuilder.build)
|
||||
log.debug("Returning result from remote typed actor invocation [%s]", result)
|
||||
} catch {
|
||||
case e: Throwable => server.notifyListeners(RemoteServerError(e, server))
|
||||
}
|
||||
log.debug("Returning result from remote typed actor invocation [%s]", result)
|
||||
|
||||
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
|
||||
None,
|
||||
Right(request.getUuid),
|
||||
actorInfo.getId,
|
||||
actorInfo.getTarget,
|
||||
actorInfo.getTimeout,
|
||||
Left(result),
|
||||
true,
|
||||
None,
|
||||
None,
|
||||
AkkaActorType.TypedActor,
|
||||
None)
|
||||
if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
|
||||
channel.write(messageBuilder.build)
|
||||
messageReceiver.invoke(typedActor, args: _*) match {
|
||||
case f: Future[_] => //If it's a future, we can lift on that to defer the send to when the future is completed
|
||||
f.onComplete( future => {
|
||||
val result: Either[Any,Throwable] = if (future.exception.isDefined) Right(future.exception.get) else Left(future.result.get)
|
||||
sendResponse(result)
|
||||
})
|
||||
case other => sendResponse(Left(other))
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
case e: InvocationTargetException =>
|
||||
|
|
@ -631,10 +723,26 @@ class RemoteServerHandler(
|
|||
server.actorsByUuid.get(uuid)
|
||||
}
|
||||
|
||||
private def findActorFactory(id: String) : () => ActorRef = {
|
||||
server.actorsFactories.get(id)
|
||||
}
|
||||
|
||||
private def findSessionActor(id: String, channel: Channel) : ActorRef = {
|
||||
sessionActors.get(channel).get(id)
|
||||
}
|
||||
|
||||
private def findTypedActorById(id: String) : AnyRef = {
|
||||
server.typedActors.get(id)
|
||||
}
|
||||
|
||||
private def findTypedActorFactory(id: String) : () => AnyRef = {
|
||||
server.typedActorsFactories.get(id)
|
||||
}
|
||||
|
||||
private def findTypedSessionActor(id: String, channel: Channel) : AnyRef = {
|
||||
typedSessionActors.get(channel).get(id)
|
||||
}
|
||||
|
||||
private def findTypedActorByUuid(uuid: String) : AnyRef = {
|
||||
server.typedActorsByUuid.get(uuid)
|
||||
}
|
||||
|
|
@ -653,6 +761,60 @@ class RemoteServerHandler(
|
|||
actorRefOrNull
|
||||
}
|
||||
|
||||
/**
|
||||
* gets the actor from the session, or creates one if there is a factory for it
|
||||
*/
|
||||
private def createSessionActor(actorInfo: ActorInfoProtocol, channel: Channel): ActorRef = {
|
||||
val uuid = actorInfo.getUuid
|
||||
val id = actorInfo.getId
|
||||
val sessionActorRefOrNull = findSessionActor(id, channel)
|
||||
if (sessionActorRefOrNull ne null)
|
||||
sessionActorRefOrNull
|
||||
else
|
||||
{
|
||||
// we dont have it in the session either, see if we have a factory for it
|
||||
val actorFactoryOrNull = findActorFactory(id)
|
||||
if (actorFactoryOrNull ne null) {
|
||||
val actorRef = actorFactoryOrNull()
|
||||
actorRef.uuid = uuidFrom(uuid.getHigh,uuid.getLow)
|
||||
sessionActors.get(channel).put(id, actorRef)
|
||||
actorRef
|
||||
}
|
||||
else
|
||||
null
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private def createClientManagedActor(actorInfo: ActorInfoProtocol): ActorRef = {
|
||||
val uuid = actorInfo.getUuid
|
||||
val id = actorInfo.getId
|
||||
val timeout = actorInfo.getTimeout
|
||||
val name = actorInfo.getTarget
|
||||
|
||||
try {
|
||||
if (RemoteServer.UNTRUSTED_MODE) throw new SecurityException(
|
||||
"Remote server is operating is untrusted mode, can not create remote actors on behalf of the remote client")
|
||||
|
||||
log.info("Creating a new remote actor [%s:%s]", name, uuid)
|
||||
val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name)
|
||||
else Class.forName(name)
|
||||
val actorRef = Actor.actorOf(clazz.asInstanceOf[Class[_ <: Actor]])
|
||||
actorRef.uuid = uuidFrom(uuid.getHigh,uuid.getLow)
|
||||
actorRef.id = id
|
||||
actorRef.timeout = timeout
|
||||
actorRef.remoteAddress = None
|
||||
server.actorsByUuid.put(actorRef.uuid.toString, actorRef) // register by uuid
|
||||
actorRef
|
||||
} catch {
|
||||
case e =>
|
||||
log.error(e, "Could not create remote actor instance")
|
||||
server.notifyListeners(RemoteServerError(e, server))
|
||||
throw e
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new instance of the actor with name, uuid and timeout specified as arguments.
|
||||
*
|
||||
|
|
@ -660,72 +822,91 @@ class RemoteServerHandler(
|
|||
*
|
||||
* Does not start the actor.
|
||||
*/
|
||||
private def createActor(actorInfo: ActorInfoProtocol): ActorRef = {
|
||||
private def createActor(actorInfo: ActorInfoProtocol, channel: Channel): ActorRef = {
|
||||
val uuid = actorInfo.getUuid
|
||||
val id = actorInfo.getId
|
||||
|
||||
val name = actorInfo.getTarget
|
||||
val timeout = actorInfo.getTimeout
|
||||
|
||||
val actorRefOrNull = findActorByIdOrUuid(id, uuidFrom(uuid.getHigh,uuid.getLow).toString)
|
||||
|
||||
if (actorRefOrNull eq null) {
|
||||
try {
|
||||
if (RemoteServer.UNTRUSTED_MODE) throw new SecurityException(
|
||||
"Remote server is operating is untrusted mode, can not create remote actors on behalf of the remote client")
|
||||
|
||||
log.info("Creating a new remote actor [%s:%s]", name, uuid)
|
||||
val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name)
|
||||
else Class.forName(name)
|
||||
val actorRef = Actor.actorOf(clazz.asInstanceOf[Class[_ <: Actor]])
|
||||
actorRef.uuid = uuidFrom(uuid.getHigh,uuid.getLow)
|
||||
actorRef.id = id
|
||||
actorRef.timeout = timeout
|
||||
actorRef.remoteAddress = None
|
||||
server.actorsByUuid.put(actorRef.uuid.toString, actorRef) // register by uuid
|
||||
actorRef
|
||||
} catch {
|
||||
case e =>
|
||||
log.error(e, "Could not create remote actor instance")
|
||||
server.notifyListeners(RemoteServerError(e, server))
|
||||
throw e
|
||||
}
|
||||
} else actorRefOrNull
|
||||
if (actorRefOrNull ne null)
|
||||
actorRefOrNull
|
||||
else
|
||||
{
|
||||
// the actor has not been registered globally. See if we have it in the session
|
||||
val sessionActorRefOrNull = createSessionActor(actorInfo, channel)
|
||||
if (sessionActorRefOrNull ne null)
|
||||
sessionActorRefOrNull
|
||||
else // maybe it is a client managed actor
|
||||
createClientManagedActor(actorInfo)
|
||||
}
|
||||
}
|
||||
|
||||
private def createTypedActor(actorInfo: ActorInfoProtocol): AnyRef = {
|
||||
/**
|
||||
* gets the actor from the session, or creates one if there is a factory for it
|
||||
*/
|
||||
private def createTypedSessionActor(actorInfo: ActorInfoProtocol, channel: Channel):AnyRef ={
|
||||
val id = actorInfo.getId
|
||||
val sessionActorRefOrNull = findTypedSessionActor(id, channel)
|
||||
if (sessionActorRefOrNull ne null)
|
||||
sessionActorRefOrNull
|
||||
else {
|
||||
val actorFactoryOrNull = findTypedActorFactory(id)
|
||||
if (actorFactoryOrNull ne null) {
|
||||
val newInstance = actorFactoryOrNull()
|
||||
typedSessionActors.get(channel).put(id, newInstance)
|
||||
newInstance
|
||||
}
|
||||
else
|
||||
null
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private def createClientManagedTypedActor(actorInfo: ActorInfoProtocol) = {
|
||||
val typedActorInfo = actorInfo.getTypedActorInfo
|
||||
val interfaceClassname = typedActorInfo.getInterface
|
||||
val targetClassname = actorInfo.getTarget
|
||||
val uuid = actorInfo.getUuid
|
||||
|
||||
try {
|
||||
if (RemoteServer.UNTRUSTED_MODE) throw new SecurityException(
|
||||
"Remote server is operating is untrusted mode, can not create remote actors on behalf of the remote client")
|
||||
|
||||
log.info("Creating a new remote typed actor:\n\t[%s :: %s]", interfaceClassname, targetClassname)
|
||||
|
||||
val (interfaceClass, targetClass) =
|
||||
if (applicationLoader.isDefined) (applicationLoader.get.loadClass(interfaceClassname),
|
||||
applicationLoader.get.loadClass(targetClassname))
|
||||
else (Class.forName(interfaceClassname), Class.forName(targetClassname))
|
||||
|
||||
val newInstance = TypedActor.newInstance(
|
||||
interfaceClass, targetClass.asInstanceOf[Class[_ <: TypedActor]], actorInfo.getTimeout).asInstanceOf[AnyRef]
|
||||
server.typedActors.put(uuidFrom(uuid.getHigh,uuid.getLow).toString, newInstance) // register by uuid
|
||||
newInstance
|
||||
} catch {
|
||||
case e =>
|
||||
log.error(e, "Could not create remote typed actor instance")
|
||||
server.notifyListeners(RemoteServerError(e, server))
|
||||
throw e
|
||||
}
|
||||
}
|
||||
|
||||
private def createTypedActor(actorInfo: ActorInfoProtocol, channel: Channel): AnyRef = {
|
||||
val uuid = actorInfo.getUuid
|
||||
val id = actorInfo.getId
|
||||
|
||||
val typedActorOrNull = findTypedActorByIdOrUuid(id, uuidFrom(uuid.getHigh,uuid.getLow).toString)
|
||||
|
||||
if (typedActorOrNull eq null) {
|
||||
val typedActorInfo = actorInfo.getTypedActorInfo
|
||||
val interfaceClassname = typedActorInfo.getInterface
|
||||
val targetClassname = actorInfo.getTarget
|
||||
|
||||
try {
|
||||
if (RemoteServer.UNTRUSTED_MODE) throw new SecurityException(
|
||||
"Remote server is operating is untrusted mode, can not create remote actors on behalf of the remote client")
|
||||
|
||||
log.info("Creating a new remote typed actor:\n\t[%s :: %s]", interfaceClassname, targetClassname)
|
||||
|
||||
val (interfaceClass, targetClass) =
|
||||
if (applicationLoader.isDefined) (applicationLoader.get.loadClass(interfaceClassname),
|
||||
applicationLoader.get.loadClass(targetClassname))
|
||||
else (Class.forName(interfaceClassname), Class.forName(targetClassname))
|
||||
|
||||
val newInstance = TypedActor.newInstance(
|
||||
interfaceClass, targetClass.asInstanceOf[Class[_ <: TypedActor]], actorInfo.getTimeout).asInstanceOf[AnyRef]
|
||||
server.typedActors.put(uuidFrom(uuid.getHigh,uuid.getLow).toString, newInstance) // register by uuid
|
||||
newInstance
|
||||
} catch {
|
||||
case e =>
|
||||
log.error(e, "Could not create remote typed actor instance")
|
||||
server.notifyListeners(RemoteServerError(e, server))
|
||||
throw e
|
||||
}
|
||||
} else typedActorOrNull
|
||||
if (typedActorOrNull ne null)
|
||||
typedActorOrNull
|
||||
else
|
||||
{
|
||||
// the actor has not been registered globally. See if we have it in the session
|
||||
val sessionActorRefOrNull = createTypedSessionActor(actorInfo, channel)
|
||||
if (sessionActorRefOrNull ne null)
|
||||
sessionActorRefOrNull
|
||||
else // maybe it is a client managed actor
|
||||
createClientManagedTypedActor(actorInfo)
|
||||
}
|
||||
}
|
||||
|
||||
private def createErrorReplyMessage(exception: Throwable, request: RemoteMessageProtocol, actorType: AkkaActorType): RemoteMessageProtocol = {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,8 @@
|
|||
package akka.actor;
|
||||
|
||||
public interface RemoteTypedSessionActor {
|
||||
|
||||
public void login(String user);
|
||||
public String getUser();
|
||||
public void doSomethingFunny() throws Exception;
|
||||
}
|
||||
|
|
@ -0,0 +1,49 @@
|
|||
package akka.actor.remote;
|
||||
|
||||
import akka.actor.*;
|
||||
|
||||
import java.util.Set;
|
||||
import java.util.HashSet;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
public class RemoteTypedSessionActorImpl extends TypedActor implements RemoteTypedSessionActor {
|
||||
|
||||
|
||||
private static Set<RemoteTypedSessionActor> instantiatedSessionActors = new HashSet<RemoteTypedSessionActor>();
|
||||
|
||||
public static Set<RemoteTypedSessionActor> getInstances() {
|
||||
return instantiatedSessionActors;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preStart() {
|
||||
instantiatedSessionActors.add(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postStop() {
|
||||
instantiatedSessionActors.remove(this);
|
||||
}
|
||||
|
||||
|
||||
private String user="anonymous";
|
||||
|
||||
@Override
|
||||
public void login(String user) {
|
||||
this.user = user;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getUser()
|
||||
{
|
||||
return this.user;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doSomethingFunny() throws Exception
|
||||
{
|
||||
throw new Exception("Bad boy");
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -3,7 +3,8 @@ package akka.actor.remote
|
|||
import java.util.concurrent.{CountDownLatch, TimeUnit}
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.{Test, Before, After}
|
||||
|
||||
import akka.util._
|
||||
|
||||
import akka.remote.{RemoteServer, RemoteClient}
|
||||
import akka.actor.Actor._
|
||||
import akka.actor.{ActorRegistry, ActorRef, Actor}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,162 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package akka.actor.remote
|
||||
|
||||
import org.scalatest._
|
||||
import org.scalatest.matchers.ShouldMatchers
|
||||
import org.scalatest.BeforeAndAfterAll
|
||||
import org.scalatest.junit.JUnitRunner
|
||||
import org.junit.runner.RunWith
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import akka.remote.{RemoteServer, RemoteClient}
|
||||
import akka.actor._
|
||||
import akka.actor.Actor._
|
||||
import RemoteTypedActorLog._
|
||||
|
||||
object ServerInitiatedRemoteSessionActorSpec {
|
||||
val HOSTNAME = "localhost"
|
||||
val PORT = 9990
|
||||
var server: RemoteServer = null
|
||||
|
||||
case class Login(user:String)
|
||||
case class GetUser()
|
||||
case class DoSomethingFunny()
|
||||
|
||||
var instantiatedSessionActors= Set[ActorRef]()
|
||||
|
||||
class RemoteStatefullSessionActorSpec extends Actor {
|
||||
|
||||
var user : String= "anonymous"
|
||||
|
||||
override def preStart = {
|
||||
instantiatedSessionActors += self
|
||||
}
|
||||
|
||||
override def postStop = {
|
||||
instantiatedSessionActors -= self
|
||||
}
|
||||
|
||||
def receive = {
|
||||
case Login(user) =>
|
||||
this.user = user
|
||||
case GetUser() =>
|
||||
self.reply(this.user)
|
||||
case DoSomethingFunny() =>
|
||||
throw new Exception("Bad boy")
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class ServerInitiatedRemoteSessionActorSpec extends
|
||||
FlatSpec with
|
||||
ShouldMatchers with
|
||||
BeforeAndAfterEach {
|
||||
import ServerInitiatedRemoteSessionActorSpec._
|
||||
|
||||
private val unit = TimeUnit.MILLISECONDS
|
||||
|
||||
|
||||
override def beforeEach = {
|
||||
server = new RemoteServer()
|
||||
server.start(HOSTNAME, PORT)
|
||||
|
||||
server.registerPerSession("untyped-session-actor-service", actorOf[RemoteStatefullSessionActorSpec])
|
||||
|
||||
Thread.sleep(1000)
|
||||
}
|
||||
|
||||
// make sure the servers shutdown cleanly after the test has finished
|
||||
override def afterEach = {
|
||||
try {
|
||||
server.shutdown
|
||||
RemoteClient.shutdownAll
|
||||
Thread.sleep(1000)
|
||||
} catch {
|
||||
case e => ()
|
||||
}
|
||||
}
|
||||
|
||||
"A remote session Actor" should "create a new session actor per connection" in {
|
||||
clearMessageLogs
|
||||
|
||||
val session1 = RemoteClient.actorFor(
|
||||
"untyped-session-actor-service",
|
||||
5000L,
|
||||
HOSTNAME, PORT)
|
||||
|
||||
val default1 = session1 !! GetUser()
|
||||
default1.get.asInstanceOf[String] should equal ("anonymous")
|
||||
session1 ! Login("session[1]")
|
||||
val result1 = session1 !! GetUser()
|
||||
result1.get.asInstanceOf[String] should equal ("session[1]")
|
||||
|
||||
session1.stop()
|
||||
|
||||
RemoteClient.shutdownAll
|
||||
|
||||
//RemoteClient.clientFor(HOSTNAME, PORT).connect
|
||||
|
||||
val session2 = RemoteClient.actorFor(
|
||||
"untyped-session-actor-service",
|
||||
5000L,
|
||||
HOSTNAME, PORT)
|
||||
|
||||
// since this is a new session, the server should reset the state
|
||||
val default2 = session2 !! GetUser()
|
||||
default2.get.asInstanceOf[String] should equal ("anonymous")
|
||||
|
||||
session2.stop()
|
||||
|
||||
}
|
||||
|
||||
it should "stop the actor when the client disconnects" in {
|
||||
|
||||
val session1 = RemoteClient.actorFor(
|
||||
"untyped-session-actor-service",
|
||||
5000L,
|
||||
HOSTNAME, PORT)
|
||||
|
||||
|
||||
val default1 = session1 !! GetUser()
|
||||
default1.get.asInstanceOf[String] should equal ("anonymous")
|
||||
|
||||
instantiatedSessionActors should have size (1)
|
||||
|
||||
RemoteClient.shutdownAll
|
||||
Thread.sleep(1000)
|
||||
instantiatedSessionActors should have size (0)
|
||||
|
||||
}
|
||||
|
||||
it should "stop the actor when there is an error" in {
|
||||
|
||||
val session1 = RemoteClient.actorFor(
|
||||
"untyped-session-actor-service",
|
||||
5000L,
|
||||
HOSTNAME, PORT)
|
||||
|
||||
|
||||
session1 ! DoSomethingFunny()
|
||||
session1.stop()
|
||||
|
||||
Thread.sleep(1000)
|
||||
|
||||
instantiatedSessionActors should have size (0)
|
||||
}
|
||||
|
||||
|
||||
it should "be able to unregister" in {
|
||||
server.registerPerSession("my-service-1", actorOf[RemoteStatefullSessionActorSpec])
|
||||
server.actorsFactories.get("my-service-1") should not be (null)
|
||||
server.unregisterPerSession("my-service-1")
|
||||
server.actorsFactories.get("my-service-1") should be (null)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,108 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package akka.actor.remote
|
||||
|
||||
import org.scalatest._
|
||||
import org.scalatest.matchers.ShouldMatchers
|
||||
import org.scalatest.BeforeAndAfterAll
|
||||
import org.scalatest.junit.JUnitRunner
|
||||
import org.junit.runner.RunWith
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import akka.remote.{RemoteServer, RemoteClient}
|
||||
import akka.actor._
|
||||
import RemoteTypedActorLog._
|
||||
|
||||
object ServerInitiatedRemoteTypedSessionActorSpec {
|
||||
val HOSTNAME = "localhost"
|
||||
val PORT = 9990
|
||||
var server: RemoteServer = null
|
||||
}
|
||||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class ServerInitiatedRemoteTypedSessionActorSpec extends
|
||||
FlatSpec with
|
||||
ShouldMatchers with
|
||||
BeforeAndAfterEach {
|
||||
import ServerInitiatedRemoteTypedActorSpec._
|
||||
|
||||
private val unit = TimeUnit.MILLISECONDS
|
||||
|
||||
|
||||
override def beforeEach = {
|
||||
server = new RemoteServer()
|
||||
server.start(HOSTNAME, PORT)
|
||||
|
||||
server.registerTypedPerSessionActor("typed-session-actor-service",
|
||||
TypedActor.newInstance(classOf[RemoteTypedSessionActor], classOf[RemoteTypedSessionActorImpl], 1000))
|
||||
|
||||
Thread.sleep(1000)
|
||||
}
|
||||
|
||||
// make sure the servers shutdown cleanly after the test has finished
|
||||
override def afterEach = {
|
||||
try {
|
||||
server.shutdown
|
||||
RemoteClient.shutdownAll
|
||||
Thread.sleep(1000)
|
||||
} catch {
|
||||
case e => ()
|
||||
}
|
||||
}
|
||||
|
||||
"A remote session Actor" should "create a new session actor per connection" in {
|
||||
clearMessageLogs
|
||||
|
||||
val session1 = RemoteClient.typedActorFor(classOf[RemoteTypedSessionActor], "typed-session-actor-service", 5000L, HOSTNAME, PORT)
|
||||
|
||||
session1.getUser() should equal ("anonymous")
|
||||
session1.login("session[1]")
|
||||
session1.getUser() should equal ("session[1]")
|
||||
|
||||
RemoteClient.shutdownAll
|
||||
|
||||
val session2 = RemoteClient.typedActorFor(classOf[RemoteTypedSessionActor], "typed-session-actor-service", 5000L, HOSTNAME, PORT)
|
||||
|
||||
session2.getUser() should equal ("anonymous")
|
||||
|
||||
}
|
||||
|
||||
it should "stop the actor when the client disconnects" in {
|
||||
|
||||
val session1 = RemoteClient.typedActorFor(classOf[RemoteTypedSessionActor], "typed-session-actor-service", 5000L, HOSTNAME, PORT)
|
||||
|
||||
session1.getUser() should equal ("anonymous")
|
||||
|
||||
RemoteTypedSessionActorImpl.getInstances() should have size (1)
|
||||
RemoteClient.shutdownAll
|
||||
Thread.sleep(1000)
|
||||
RemoteTypedSessionActorImpl.getInstances() should have size (0)
|
||||
|
||||
}
|
||||
|
||||
it should "stop the actor when there is an error" in {
|
||||
|
||||
val session1 = RemoteClient.typedActorFor(classOf[RemoteTypedSessionActor], "typed-session-actor-service", 5000L, HOSTNAME, PORT)
|
||||
|
||||
session1.doSomethingFunny()
|
||||
|
||||
RemoteClient.shutdownAll
|
||||
Thread.sleep(1000)
|
||||
RemoteTypedSessionActorImpl.getInstances() should have size (0)
|
||||
|
||||
}
|
||||
|
||||
|
||||
it should "be able to unregister" in {
|
||||
server.registerTypedPerSessionActor("my-service-1",TypedActor.newInstance(classOf[RemoteTypedSessionActor], classOf[RemoteTypedSessionActorImpl], 1000))
|
||||
|
||||
server.typedActorsFactories.get("my-service-1") should not be (null)
|
||||
server.unregisterTypedPerSessionActor("my-service-1")
|
||||
server.typedActorsFactories.get("my-service-1") should be (null)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
61
akka-remote/src/test/scala/ticket/Ticket506Spec.scala
Normal file
61
akka-remote/src/test/scala/ticket/Ticket506Spec.scala
Normal file
|
|
@ -0,0 +1,61 @@
|
|||
package ticket
|
||||
|
||||
import org.scalatest.Spec
|
||||
import org.scalatest.matchers.ShouldMatchers
|
||||
|
||||
import akka.remote.{RemoteClient, RemoteNode, RemoteServer}
|
||||
import akka.actor.{Actor, ActorRef}
|
||||
import akka.serialization.RemoteActorSerialization
|
||||
import akka.actor.Actor.actorOf
|
||||
|
||||
import java.util.concurrent.{CountDownLatch, TimeUnit}
|
||||
|
||||
object State {
|
||||
val latch = new CountDownLatch(1)
|
||||
}
|
||||
|
||||
case class RecvActorRef(bytes:Array[Byte])
|
||||
|
||||
class ActorRefService extends Actor {
|
||||
import self._
|
||||
|
||||
def receive:Receive = {
|
||||
case RecvActorRef(bytes) =>
|
||||
val ref = RemoteActorSerialization.fromBinaryToRemoteActorRef(bytes)
|
||||
ref ! "hello"
|
||||
case "hello" =>
|
||||
State.latch.countDown
|
||||
}
|
||||
}
|
||||
|
||||
class Ticket506Spec extends Spec with ShouldMatchers {
|
||||
val hostname:String = "localhost"
|
||||
val port:Int = 9440
|
||||
|
||||
describe("a RemoteActorRef serialized") {
|
||||
it("should be remotely usable") {
|
||||
val s1,s2 = new RemoteServer
|
||||
s1.start(hostname, port)
|
||||
s2.start(hostname, port + 1)
|
||||
|
||||
val a1,a2 = actorOf[ActorRefService]
|
||||
a1.homeAddress = (hostname, port)
|
||||
a2.homeAddress = (hostname, port+1)
|
||||
|
||||
s1.register("service", a1)
|
||||
s2.register("service", a2)
|
||||
|
||||
// connect to the first server/service
|
||||
val c1 = RemoteClient.actorFor("service", hostname, port)
|
||||
|
||||
val bytes = RemoteActorSerialization.toRemoteActorRefProtocol(a2).toByteArray
|
||||
c1 ! RecvActorRef(bytes)
|
||||
|
||||
State.latch.await(1000, TimeUnit.MILLISECONDS) should be(true)
|
||||
|
||||
RemoteClient.shutdownAll
|
||||
s1.shutdown
|
||||
s2.shutdown
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -59,7 +59,7 @@ trait AkkaProject extends AkkaBaseProject {
|
|||
val akkaVersion = "1.0-SNAPSHOT"
|
||||
|
||||
// convenience method
|
||||
def akkaModule(module: String) = "akka" %% ("akka-" + module) % akkaVersion
|
||||
def akkaModule(module: String) = "se.scalablesolutions.akka" % ("akka-" + module) % akkaVersion
|
||||
|
||||
// akka actor dependency by default
|
||||
val akkaActor = akkaModule("actor")
|
||||
|
|
|
|||
|
|
@ -313,7 +313,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
|
|||
// -------------------------------------------------------------------------------------------------------------------
|
||||
// Miscellaneous
|
||||
// -------------------------------------------------------------------------------------------------------------------
|
||||
override def artifactID: String = this.name
|
||||
override def disableCrossPaths = true
|
||||
|
||||
override def mainClass = Some("akka.kernel.Main")
|
||||
|
||||
|
|
@ -383,7 +383,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
|
|||
|
||||
override def pomExtra =
|
||||
<inceptionYear>2009</inceptionYear>
|
||||
<url>http://akkasource.org</url>
|
||||
<url>http://akka.io</url>
|
||||
<organization>
|
||||
<name>Scalable Solutions AB</name>
|
||||
<url>http://scalablesolutions.se</url>
|
||||
|
|
@ -956,7 +956,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
|
|||
|
||||
// ------------------------------------------------------------
|
||||
class AkkaDefaultProject(info: ProjectInfo, val deployPath: Path) extends DefaultProject(info) with DeployProject with OSGiProject {
|
||||
override def artifactID: String = this.name
|
||||
override def disableCrossPaths = true
|
||||
lazy val sourceArtifact = Artifact(this.artifactID, "source", "jar", Some("sources"), Nil, None)
|
||||
lazy val docsArtifact = Artifact(this.artifactID, "doc", "jar", Some("docs"), Nil, None)
|
||||
override def runClasspath = super.runClasspath +++ (AkkaParentProject.this.info.projectPath / "config")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue