Fixed bug in ActorRegistry getting typed actor by manifest

This commit is contained in:
Jonas Bonér 2010-11-22 08:59:45 +01:00
parent 5820286a58
commit 80adb71850
6 changed files with 106 additions and 106 deletions

View file

@ -50,7 +50,7 @@ object ActorRegistry extends ListenerManagement {
/** /**
* Returns the number of actors in the system. * Returns the number of actors in the system.
*/ */
def size : Int = actorsByUUID.size def size: Int = actorsByUUID.size
/** /**
* Invokes a function for all actors. * Invokes a function for all actors.
@ -68,8 +68,7 @@ object ActorRegistry extends ListenerManagement {
val elements = actorsByUUID.elements val elements = actorsByUUID.elements
while (elements.hasMoreElements) { while (elements.hasMoreElements) {
val element = elements.nextElement val element = elements.nextElement
if(f isDefinedAt element) if (f isDefinedAt element) return Some(f(element))
return Some(f(element))
} }
None None
} }
@ -88,9 +87,7 @@ object ActorRegistry extends ListenerManagement {
val elements = actorsByUUID.elements val elements = actorsByUUID.elements
while (elements.hasMoreElements) { while (elements.hasMoreElements) {
val actorId = elements.nextElement val actorId = elements.nextElement
if (p(actorId)) { if (p(actorId)) all += actorId
all += actorId
}
} }
all.toArray all.toArray
} }
@ -105,7 +102,7 @@ object ActorRegistry extends ListenerManagement {
* Finds any actor that matches T. * Finds any actor that matches T.
*/ */
def actorFor[T <: Actor](implicit manifest: Manifest[T]): Option[ActorRef] = def actorFor[T <: Actor](implicit manifest: Manifest[T]): Option[ActorRef] =
find({ case a:ActorRef if manifest.erasure.isAssignableFrom(a.actor.getClass) => a }) find({ case a: ActorRef if manifest.erasure.isAssignableFrom(a.actor.getClass) => a })
/** /**
* Finds all actors of type or sub-type specified by the class passed in as the Class argument. * Finds all actors of type or sub-type specified by the class passed in as the Class argument.
@ -132,13 +129,11 @@ object ActorRegistry extends ListenerManagement {
* Invokes a function for all typed actors. * Invokes a function for all typed actors.
*/ */
def foreachTypedActor(f: (AnyRef) => Unit) = { def foreachTypedActor(f: (AnyRef) => Unit) = {
TypedActorModule.ensureTypedActorEnabled TypedActorModule.ensureEnabled
val elements = actorsByUUID.elements val elements = actorsByUUID.elements
while (elements.hasMoreElements) { while (elements.hasMoreElements) {
val proxy = typedActorFor(elements.nextElement) val proxy = typedActorFor(elements.nextElement)
if (proxy.isDefined) { if (proxy.isDefined) f(proxy.get)
f(proxy.get)
}
} }
} }
@ -147,12 +142,11 @@ object ActorRegistry extends ListenerManagement {
* Returns None if the function never returns Some * Returns None if the function never returns Some
*/ */
def findTypedActor[T](f: PartialFunction[AnyRef,T]) : Option[T] = { def findTypedActor[T](f: PartialFunction[AnyRef,T]) : Option[T] = {
TypedActorModule.ensureTypedActorEnabled TypedActorModule.ensureEnabled
val elements = actorsByUUID.elements val elements = actorsByUUID.elements
while (elements.hasMoreElements) { while (elements.hasMoreElements) {
val proxy = typedActorFor(elements.nextElement) val proxy = typedActorFor(elements.nextElement)
if(proxy.isDefined && (f isDefinedAt proxy)) if (proxy.isDefined && (f isDefinedAt proxy)) return Some(f(proxy))
return Some(f(proxy))
} }
None None
} }
@ -161,14 +155,12 @@ object ActorRegistry extends ListenerManagement {
* Finds all typed actors that satisfy a predicate. * Finds all typed actors that satisfy a predicate.
*/ */
def filterTypedActors(p: AnyRef => Boolean): Array[AnyRef] = { def filterTypedActors(p: AnyRef => Boolean): Array[AnyRef] = {
TypedActorModule.ensureTypedActorEnabled TypedActorModule.ensureEnabled
val all = new ListBuffer[AnyRef] val all = new ListBuffer[AnyRef]
val elements = actorsByUUID.elements val elements = actorsByUUID.elements
while (elements.hasMoreElements) { while (elements.hasMoreElements) {
val proxy = typedActorFor(elements.nextElement) val proxy = typedActorFor(elements.nextElement)
if (proxy.isDefined && p(proxy.get)) { if (proxy.isDefined && p(proxy.get)) all += proxy.get
all += proxy.get
}
} }
all.toArray all.toArray
} }
@ -177,7 +169,7 @@ object ActorRegistry extends ListenerManagement {
* Finds all typed actors that are subtypes of the class passed in as the Manifest argument. * Finds all typed actors that are subtypes of the class passed in as the Manifest argument.
*/ */
def typedActorsFor[T <: AnyRef](implicit manifest: Manifest[T]): Array[AnyRef] = { def typedActorsFor[T <: AnyRef](implicit manifest: Manifest[T]): Array[AnyRef] = {
TypedActorModule.ensureTypedActorEnabled TypedActorModule.ensureEnabled
typedActorsFor[T](manifest.erasure.asInstanceOf[Class[T]]) typedActorsFor[T](manifest.erasure.asInstanceOf[Class[T]])
} }
@ -185,20 +177,20 @@ object ActorRegistry extends ListenerManagement {
* Finds any typed actor that matches T. * Finds any typed actor that matches T.
*/ */
def typedActorFor[T <: AnyRef](implicit manifest: Manifest[T]): Option[AnyRef] = { def typedActorFor[T <: AnyRef](implicit manifest: Manifest[T]): Option[AnyRef] = {
TypedActorModule.ensureTypedActorEnabled TypedActorModule.ensureEnabled
def predicate(proxy: AnyRef) : Boolean = { def predicate(proxy: AnyRef): Boolean = {
val actorRef = TypedActorModule.typedActorObjectInstance.get.actorFor(proxy) val actorRef = TypedActorModule.typedActorObjectInstance.get.actorFor(proxy)
actorRef.isDefined && manifest.erasure.isAssignableFrom(actorRef.get.actor.getClass) actorRef.isDefined && manifest.erasure.isAssignableFrom(actorRef.get.actor.getClass)
} }
findTypedActor({ case a:AnyRef if predicate(a) => a }) findTypedActor({ case a: Some[AnyRef] if predicate(a.get) => a })
} }
/** /**
* Finds all typed actors of type or sub-type specified by the class passed in as the Class argument. * Finds all typed actors of type or sub-type specified by the class passed in as the Class argument.
*/ */
def typedActorsFor[T <: AnyRef](clazz: Class[T]): Array[AnyRef] = { def typedActorsFor[T <: AnyRef](clazz: Class[T]): Array[AnyRef] = {
TypedActorModule.ensureTypedActorEnabled TypedActorModule.ensureEnabled
def predicate(proxy: AnyRef) : Boolean = { def predicate(proxy: AnyRef): Boolean = {
val actorRef = TypedActorModule.typedActorObjectInstance.get.actorFor(proxy) val actorRef = TypedActorModule.typedActorObjectInstance.get.actorFor(proxy)
actorRef.isDefined && clazz.isAssignableFrom(actorRef.get.actor.getClass) actorRef.isDefined && clazz.isAssignableFrom(actorRef.get.actor.getClass)
} }
@ -209,7 +201,7 @@ object ActorRegistry extends ListenerManagement {
* Finds all typed actors that have a specific id. * Finds all typed actors that have a specific id.
*/ */
def typedActorsFor(id: String): Array[AnyRef] = { def typedActorsFor(id: String): Array[AnyRef] = {
TypedActorModule.ensureTypedActorEnabled TypedActorModule.ensureEnabled
val actorRefs = actorsById values id val actorRefs = actorsById values id
actorRefs.flatMap(typedActorFor(_)) actorRefs.flatMap(typedActorFor(_))
} }
@ -218,12 +210,10 @@ object ActorRegistry extends ListenerManagement {
* Finds the typed actor that has a specific UUID. * Finds the typed actor that has a specific UUID.
*/ */
def typedActorFor(uuid: Uuid): Option[AnyRef] = { def typedActorFor(uuid: Uuid): Option[AnyRef] = {
TypedActorModule.ensureTypedActorEnabled TypedActorModule.ensureEnabled
val actorRef = actorsByUUID get uuid val actorRef = actorsByUUID get uuid
if (actorRef eq null) if (actorRef eq null) None
None else typedActorFor(actorRef)
else
typedActorFor(actorRef)
} }
/** /**
@ -265,20 +255,15 @@ object ActorRegistry extends ListenerManagement {
*/ */
def shutdownAll() { def shutdownAll() {
log.info("Shutting down all actors in the system...") log.info("Shutting down all actors in the system...")
if (TypedActorModule.isTypedActorEnabled) { if (TypedActorModule.isEnabled) {
val elements = actorsByUUID.elements val elements = actorsByUUID.elements
while (elements.hasMoreElements) { while (elements.hasMoreElements) {
val actorRef = elements.nextElement val actorRef = elements.nextElement
val proxy = typedActorFor(actorRef) val proxy = typedActorFor(actorRef)
if (proxy.isDefined) { if (proxy.isDefined) TypedActorModule.typedActorObjectInstance.get.stop(proxy.get)
TypedActorModule.typedActorObjectInstance.get.stop(proxy.get) else actorRef.stop
} else {
actorRef.stop
}
}
} else {
foreach(_.stop)
} }
} else foreach(_.stop)
actorsByUUID.clear actorsByUUID.clear
actorsById.clear actorsById.clear
log.info("All actors have been shut down and unregistered from ActorRegistry") log.info("All actors have been shut down and unregistered from ActorRegistry")
@ -337,16 +322,13 @@ class Index[K <: AnyRef,V <: AnyRef : Manifest] {
if (set ne null) { if (set ne null) {
set.synchronized { set.synchronized {
if (set.isEmpty) { if (set.isEmpty) retry = true //IF the set is empty then it has been removed, so signal retry
retry = true //IF the set is empty then it has been removed, so signal retry
}
else { //Else add the value to the set and signal that retry is not needed else { //Else add the value to the set and signal that retry is not needed
added = set add v added = set add v
retry = false retry = false
} }
} }
} } else {
else {
val newSet = new ConcurrentSkipListSet[V] val newSet = new ConcurrentSkipListSet[V]
newSet add v newSet add v
@ -354,24 +336,20 @@ class Index[K <: AnyRef,V <: AnyRef : Manifest] {
val oldSet = container.putIfAbsent(k,newSet) val oldSet = container.putIfAbsent(k,newSet)
if (oldSet ne null) { if (oldSet ne null) {
oldSet.synchronized { oldSet.synchronized {
if (oldSet.isEmpty) { if (oldSet.isEmpty) retry = true //IF the set is empty then it has been removed, so signal retry
retry = true //IF the set is empty then it has been removed, so signal retry
}
else { //Else try to add the value to the set and signal that retry is not needed else { //Else try to add the value to the set and signal that retry is not needed
added = oldSet add v added = oldSet add v
retry = false retry = false
} }
} }
} else { } else added = true
added = true
}
} }
if (retry) spinPut(k,v) if (retry) spinPut(k, v)
else added else added
} }
spinPut(key,value) spinPut(key, value)
} }
/** /**
@ -390,10 +368,8 @@ class Index[K <: AnyRef,V <: AnyRef : Manifest] {
def findValue(key: K)(f: (V) => Boolean): Option[V] = { def findValue(key: K)(f: (V) => Boolean): Option[V] = {
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
val set = container get key val set = container get key
if (set ne null) if (set ne null) set.iterator.find(f)
set.iterator.find(f) else None
else
None
} }
/** /**
@ -420,8 +396,7 @@ class Index[K <: AnyRef,V <: AnyRef : Manifest] {
container.remove(key,emptySet) //We try to remove the key if it's mapped to an empty set container.remove(key,emptySet) //We try to remove the key if it's mapped to an empty set
true //Remove succeeded true //Remove succeeded
} } else false //Remove failed
else false //Remove failed
} }
} else false //Remove failed } else false //Remove failed
} }
@ -434,5 +409,5 @@ class Index[K <: AnyRef,V <: AnyRef : Manifest] {
/** /**
* Removes all keys and all values * Removes all keys and all values
*/ */
def clear = foreach { case (k,v) => remove(k,v) } def clear = foreach { case (k, v) => remove(k, v) }
} }

View file

@ -5,7 +5,7 @@
package akka.dispatch package akka.dispatch
import akka.actor.{ActorRef, IllegalActorStateException} import akka.actor.{ActorRef, IllegalActorStateException}
import akka.util.ReflectiveAccess.EnterpriseModule import akka.util.ReflectiveAccess.AkkaCloudModule
import java.util.Queue import java.util.Queue
import akka.util.Switch import akka.util.Switch
@ -123,10 +123,10 @@ class ExecutorBasedEventDrivenDispatcher(
*/ */
def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef = mailboxType match { def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef = mailboxType match {
// FIXME make generic (work for TypedActor as well) // FIXME make generic (work for TypedActor as well)
case FileBasedDurableMailbox(serializer) => EnterpriseModule.createFileBasedMailbox(actorRef).asInstanceOf[MessageQueue] case FileBasedDurableMailbox(serializer) => AkkaCloudModule.createFileBasedMailbox(actorRef).asInstanceOf[MessageQueue]
case ZooKeeperBasedDurableMailbox(serializer) => EnterpriseModule.createZooKeeperBasedMailbox(actorRef).asInstanceOf[MessageQueue] case ZooKeeperBasedDurableMailbox(serializer) => AkkaCloudModule.createZooKeeperBasedMailbox(actorRef).asInstanceOf[MessageQueue]
case BeanstalkBasedDurableMailbox(serializer) => EnterpriseModule.createBeanstalkBasedMailbox(actorRef).asInstanceOf[MessageQueue] case BeanstalkBasedDurableMailbox(serializer) => AkkaCloudModule.createBeanstalkBasedMailbox(actorRef).asInstanceOf[MessageQueue]
case RedisBasedDurableMailbox(serializer) => EnterpriseModule.createRedisBasedMailbox(actorRef).asInstanceOf[MessageQueue] case RedisBasedDurableMailbox(serializer) => AkkaCloudModule.createRedisBasedMailbox(actorRef).asInstanceOf[MessageQueue]
case AMQPBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("AMQPBasedDurableMailbox is not yet supported") case AMQPBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("AMQPBasedDurableMailbox is not yet supported")
case JMSBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("JMSBasedDurableMailbox is not yet supported") case JMSBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("JMSBasedDurableMailbox is not yet supported")
} }

View file

@ -5,7 +5,7 @@
package akka.dispatch package akka.dispatch
import akka.actor.{Actor, ActorType, ActorRef, ActorInitializationException} import akka.actor.{Actor, ActorType, ActorRef, ActorInitializationException}
import akka.util.ReflectiveAccess.EnterpriseModule import akka.util.ReflectiveAccess.AkkaCloudModule
import akka.AkkaException import akka.AkkaException
import java.util.{Queue, List} import java.util.{Queue, List}
@ -42,15 +42,15 @@ case class BoundedMailbox(
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
} }
abstract class DurableMailboxType(val serializer: EnterpriseModule.Serializer) extends MailboxType { abstract class DurableMailboxType(val serializer: AkkaCloudModule.Serializer) extends MailboxType {
if (serializer eq null) throw new IllegalArgumentException("The serializer for DurableMailboxType can not be null") if (serializer eq null) throw new IllegalArgumentException("The serializer for DurableMailboxType can not be null")
} }
case class FileBasedDurableMailbox(ser: EnterpriseModule.Serializer) extends DurableMailboxType(ser) case class FileBasedDurableMailbox(ser: AkkaCloudModule.Serializer) extends DurableMailboxType(ser)
case class RedisBasedDurableMailbox(ser: EnterpriseModule.Serializer) extends DurableMailboxType(ser) case class RedisBasedDurableMailbox(ser: AkkaCloudModule.Serializer) extends DurableMailboxType(ser)
case class BeanstalkBasedDurableMailbox(ser: EnterpriseModule.Serializer) extends DurableMailboxType(ser) case class BeanstalkBasedDurableMailbox(ser: AkkaCloudModule.Serializer) extends DurableMailboxType(ser)
case class ZooKeeperBasedDurableMailbox(ser: EnterpriseModule.Serializer) extends DurableMailboxType(ser) case class ZooKeeperBasedDurableMailbox(ser: AkkaCloudModule.Serializer) extends DurableMailboxType(ser)
case class AMQPBasedDurableMailbox(ser: EnterpriseModule.Serializer) extends DurableMailboxType(ser) case class AMQPBasedDurableMailbox(ser: AkkaCloudModule.Serializer) extends DurableMailboxType(ser)
case class JMSBasedDurableMailbox(ser: EnterpriseModule.Serializer) extends DurableMailboxType(ser) case class JMSBasedDurableMailbox(ser: AkkaCloudModule.Serializer) extends DurableMailboxType(ser)
class DefaultUnboundedMessageQueue(blockDequeue: Boolean) class DefaultUnboundedMessageQueue(blockDequeue: Boolean)
extends LinkedBlockingQueue[MessageInvocation] with MessageQueue { extends LinkedBlockingQueue[MessageInvocation] with MessageQueue {

View file

@ -20,13 +20,13 @@ object ReflectiveAccess extends Logging {
val loader = getClass.getClassLoader val loader = getClass.getClassLoader
lazy val isRemotingEnabled = RemoteClientModule.isRemotingEnabled lazy val isRemotingEnabled = RemoteClientModule.isEnabled
lazy val isTypedActorEnabled = TypedActorModule.isTypedActorEnabled lazy val isTypedActorEnabled = TypedActorModule.isEnabled
lazy val isEnterpriseEnabled = EnterpriseModule.isEnterpriseEnabled lazy val isAkkaCloudEnabled = AkkaCloudModule.isEnabled
def ensureRemotingEnabled = RemoteClientModule.ensureRemotingEnabled def ensureRemotingEnabled = RemoteClientModule.ensureEnabled
def ensureTypedActorEnabled = TypedActorModule.ensureTypedActorEnabled def ensureTypedActorEnabled = TypedActorModule.ensureEnabled
def ensureEnterpriseEnabled = EnterpriseModule.ensureEnterpriseEnabled def ensureAkkaCloudEnabled = AkkaCloudModule.ensureEnabled
/** /**
* Reflective access to the RemoteClient module. * Reflective access to the RemoteClient module.
@ -56,32 +56,32 @@ object ReflectiveAccess extends Logging {
def clientFor(hostname: String, port: Int, loader: Option[ClassLoader]): RemoteClient def clientFor(hostname: String, port: Int, loader: Option[ClassLoader]): RemoteClient
} }
lazy val isRemotingEnabled = remoteClientObjectInstance.isDefined lazy val isEnabled = remoteClientObjectInstance.isDefined
def ensureRemotingEnabled = if (!isRemotingEnabled) throw new ModuleNotAvailableException( def ensureEnabled = if (!isEnabled) throw new ModuleNotAvailableException(
"Can't load the remoting module, make sure that akka-remote.jar is on the classpath") "Can't load the remoting module, make sure that akka-remote.jar is on the classpath")
val remoteClientObjectInstance: Option[RemoteClientObject] = val remoteClientObjectInstance: Option[RemoteClientObject] =
getObjectFor("akka.remote.RemoteClient$") getObjectFor("akka.remote.RemoteClient$")
def register(address: InetSocketAddress, uuid: Uuid) = { def register(address: InetSocketAddress, uuid: Uuid) = {
ensureRemotingEnabled ensureEnabled
remoteClientObjectInstance.get.register(address.getHostName, address.getPort, uuid) remoteClientObjectInstance.get.register(address.getHostName, address.getPort, uuid)
} }
def unregister(address: InetSocketAddress, uuid: Uuid) = { def unregister(address: InetSocketAddress, uuid: Uuid) = {
ensureRemotingEnabled ensureEnabled
remoteClientObjectInstance.get.unregister(address.getHostName, address.getPort, uuid) remoteClientObjectInstance.get.unregister(address.getHostName, address.getPort, uuid)
} }
def registerSupervisorForActor(remoteAddress: InetSocketAddress, actorRef: ActorRef) = { def registerSupervisorForActor(remoteAddress: InetSocketAddress, actorRef: ActorRef) = {
ensureRemotingEnabled ensureEnabled
val remoteClient = remoteClientObjectInstance.get.clientFor(remoteAddress) val remoteClient = remoteClientObjectInstance.get.clientFor(remoteAddress)
remoteClient.registerSupervisorForActor(actorRef) remoteClient.registerSupervisorForActor(actorRef)
} }
def clientFor(hostname: String, port: Int, loader: Option[ClassLoader]): RemoteClient = { def clientFor(hostname: String, port: Int, loader: Option[ClassLoader]): RemoteClient = {
ensureRemotingEnabled ensureEnabled
remoteClientObjectInstance.get.clientFor(hostname, port, loader) remoteClientObjectInstance.get.clientFor(hostname, port, loader)
} }
@ -95,7 +95,7 @@ object ReflectiveAccess extends Logging {
actorRef: ActorRef, actorRef: ActorRef,
typedActorInfo: Option[Tuple2[String, String]], typedActorInfo: Option[Tuple2[String, String]],
actorType: ActorType): Option[CompletableFuture[T]] = { actorType: ActorType): Option[CompletableFuture[T]] = {
ensureRemotingEnabled ensureEnabled
clientFor(remoteAddress.getHostName, remoteAddress.getPort, None).send[T]( clientFor(remoteAddress.getHostName, remoteAddress.getPort, None).send[T](
message, senderOption, senderFuture, remoteAddress, timeout, isOneWay, actorRef, typedActorInfo, actorType) message, senderOption, senderFuture, remoteAddress, timeout, isOneWay, actorRef, typedActorInfo, actorType)
} }
@ -126,17 +126,17 @@ object ReflectiveAccess extends Logging {
getObjectFor("akka.remote.RemoteNode$") getObjectFor("akka.remote.RemoteNode$")
def registerActor(address: InetSocketAddress, actorRef: ActorRef) = { def registerActor(address: InetSocketAddress, actorRef: ActorRef) = {
ensureRemotingEnabled RemoteClientModule.ensureEnabled
remoteServerObjectInstance.get.registerActor(address, actorRef) remoteServerObjectInstance.get.registerActor(address, actorRef)
} }
def registerTypedActor(address: InetSocketAddress, implementationClassName: String, proxy: AnyRef) = { def registerTypedActor(address: InetSocketAddress, implementationClassName: String, proxy: AnyRef) = {
ensureRemotingEnabled RemoteClientModule.ensureEnabled
remoteServerObjectInstance.get.registerTypedActor(address, implementationClassName, proxy) remoteServerObjectInstance.get.registerTypedActor(address, implementationClassName, proxy)
} }
def unregister(actorRef: ActorRef) = { def unregister(actorRef: ActorRef) = {
ensureRemotingEnabled RemoteClientModule.ensureEnabled
remoteNodeObjectInstance.get.unregister(actorRef) remoteNodeObjectInstance.get.unregister(actorRef)
} }
} }
@ -156,16 +156,16 @@ object ReflectiveAccess extends Logging {
def stop(anyRef: AnyRef) : Unit def stop(anyRef: AnyRef) : Unit
} }
lazy val isTypedActorEnabled = typedActorObjectInstance.isDefined lazy val isEnabled = typedActorObjectInstance.isDefined
def ensureTypedActorEnabled = if (!isTypedActorEnabled) throw new ModuleNotAvailableException( def ensureEnabled = if (!isTypedActorEnabled) throw new ModuleNotAvailableException(
"Can't load the typed actor module, make sure that akka-typed-actor.jar is on the classpath") "Can't load the typed actor module, make sure that akka-typed-actor.jar is on the classpath")
val typedActorObjectInstance: Option[TypedActorObject] = val typedActorObjectInstance: Option[TypedActorObject] =
getObjectFor("akka.actor.TypedActor$") getObjectFor("akka.actor.TypedActor$")
def resolveFutureIfMessageIsJoinPoint(message: Any, future: Future[_]): Boolean = { def resolveFutureIfMessageIsJoinPoint(message: Any, future: Future[_]): Boolean = {
ensureTypedActorEnabled ensureEnabled
if (typedActorObjectInstance.get.isJoinPointAndOneWay(message)) { if (typedActorObjectInstance.get.isJoinPointAndOneWay(message)) {
future.asInstanceOf[CompletableFuture[Option[_]]].completeWithResult(None) future.asInstanceOf[CompletableFuture[Option[_]]].completeWithResult(None)
} }
@ -173,7 +173,7 @@ object ReflectiveAccess extends Logging {
} }
} }
object EnterpriseModule { object AkkaCloudModule {
type Mailbox = { type Mailbox = {
def enqueue(message: MessageInvocation) def enqueue(message: MessageInvocation)
@ -185,27 +185,27 @@ object ReflectiveAccess extends Logging {
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef
} }
lazy val isEnterpriseEnabled = clusterObjectInstance.isDefined lazy val isEnabled = clusterObjectInstance.isDefined
val clusterObjectInstance: Option[AnyRef] = val clusterObjectInstance: Option[AnyRef] =
getObjectFor("akka.cluster.Cluster$") getObjectFor("akka.cloud.cluster.Cluster$")
val serializerClass: Option[Class[_]] = val serializerClass: Option[Class[_]] =
getClassFor("akka.serialization.Serializer") getClassFor("akka.serialization.Serializer")
def ensureEnterpriseEnabled = if (!isEnterpriseEnabled) throw new ModuleNotAvailableException( def ensureEnabled = if (!isEnabled) throw new ModuleNotAvailableException(
"Feature is only available in Akka Enterprise edition") "Feature is only available in Akka Cloud")
def createFileBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.actor.mailbox.FileBasedMailbox", actorRef) def createFileBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.cloud.cluster.FileBasedMailbox", actorRef)
def createZooKeeperBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.actor.mailbox.ZooKeeperBasedMailbox", actorRef) def createZooKeeperBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.cloud.cluster.ZooKeeperBasedMailbox", actorRef)
def createBeanstalkBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.actor.mailbox.BeanstalkBasedMailbox", actorRef) def createBeanstalkBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.cloud.cluster.BeanstalkBasedMailbox", actorRef)
def createRedisBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.actor.mailbox.RedisBasedMailbox", actorRef) def createRedisBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.cloud.cluster.RedisBasedMailbox", actorRef)
private def createMailbox(mailboxClassname: String, actorRef: ActorRef): Mailbox = { private def createMailbox(mailboxClassname: String, actorRef: ActorRef): Mailbox = {
ensureEnterpriseEnabled ensureEnabled
createInstance( createInstance(
mailboxClassname, mailboxClassname,
Array(classOf[ActorRef]), Array(classOf[ActorRef]),

View file

@ -27,7 +27,6 @@ object ActorRegistrySpec {
self.reply("got ping") self.reply("got ping")
} }
} }
} }
class ActorRegistrySpec extends JUnitSuite { class ActorRegistrySpec extends JUnitSuite {

View file

@ -0,0 +1,26 @@
package akka.actor
import org.scalatest.junit.JUnitSuite
import org.junit.Test
object TypedActorRegistrySpec {
trait My
class MyImpl extends TypedActor with My
}
class TypedActorRegistrySpec extends JUnitSuite {
import TypedActorRegistrySpec._
@Test def shouldGetTypedActorByClassFromActorRegistry {
ActorRegistry.shutdownAll
val my = TypedActor.newInstance[My](classOf[My], classOf[MyImpl], 3000)
val actors = ActorRegistry.typedActorsFor(classOf[My])
assert(actors.length === 1)
val option = ActorRegistry.typedActorFor[My]
assert(option != null)
assert(option.isDefined)
ActorRegistry.shutdownAll
}
}