Merge branch 'master' of github.com:jboner/akka into ticket-538

This commit is contained in:
ticktock 2010-11-22 19:55:26 -05:00
commit b6502bec6f
184 changed files with 829 additions and 231 deletions

View file

@ -34,8 +34,16 @@ abstract class RemoteActor(address: InetSocketAddress) extends Actor {
*/
@serializable sealed trait LifeCycleMessage
case class HotSwap(code: Actor.Receive) extends LifeCycleMessage {
def this(behavior: Procedure[Any]) = this({ case msg => behavior.apply(msg) }: Actor.Receive)
case class HotSwap(code: ActorRef => Actor.Receive) extends LifeCycleMessage {
/**
* Java API
*/
def this(code: akka.japi.Function[ActorRef,Procedure[Any]]) =
this( (self: ActorRef) => {
val behavior = code(self)
val result: Actor.Receive = { case msg => behavior(msg) }
result
})
}
case object RevertHotSwap extends LifeCycleMessage
@ -66,7 +74,7 @@ class ActorInitializationException private[akka](message: String) extends AkkaEx
class ActorTimeoutException private[akka](message: String) extends AkkaException(message)
/**
* This message is thrown by default when an Actors behavior doesn't match a message
* This message is thrown by default when an Actors behavior doesn't match a message
*/
case class UnhandledMessageException(msg: Any, ref: ActorRef) extends Exception {
override def getMessage() = "Actor %s does not handle [%s]".format(ref,msg)
@ -426,7 +434,7 @@ trait Actor extends Logging {
private lazy val processingBehavior: Receive = {
lazy val defaultBehavior = receive
val actorBehavior: Receive = {
case HotSwap(code) => become(code)
case HotSwap(code) => become(code(self))
case RevertHotSwap => unbecome
case Exit(dead, reason) => self.handleTrapExit(dead, reason)
case Link(child) => self.link(child)
@ -444,7 +452,7 @@ trait Actor extends Logging {
private lazy val fullBehavior: Receive = {
lazy val defaultBehavior = receive
val actorBehavior: Receive = {
case HotSwap(code) => become(code)
case HotSwap(code) => become(code(self))
case RevertHotSwap => unbecome
case Exit(dead, reason) => self.handleTrapExit(dead, reason)
case Link(child) => self.link(child)

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.actor
@ -139,8 +139,24 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
def setFaultHandler(handler: FaultHandlingStrategy)
def getFaultHandler(): FaultHandlingStrategy
@volatile
private[akka] var _dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher
/**
* Akka Java API
* A lifeCycle defines whether the actor will be stopped on error (Temporary) or if it can be restarted (Permanent)
* <p/>
* Can be one of:
*
* import static akka.config.Supervision.*;
* <pre>
* getContext().setLifeCycle(permanent());
* </pre>
* Or:
* <pre>
* getContext().setLifeCycle(temporary());
* </pre>
*/
def setLifeCycle(lifeCycle: LifeCycle): Unit
def getLifeCycle(): LifeCycle
/**
* Akka Java API
@ -597,6 +613,8 @@ class LocalActorRef private[akka] (
private var restartsWithinTimeRangeTimestamp: Long = 0L
@volatile
private var _mailbox: AnyRef = _
@volatile
private[akka] var _dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher
protected[akka] val actorInstance = guard.withGuard { new AtomicReference[Actor](newActor) }
@ -1208,6 +1226,8 @@ private[akka] case class RemoteActorRef private[akka] (
ensureRemotingEnabled
val remoteAddress: Option[InetSocketAddress] = Some(new InetSocketAddress(hostname, port))
id = classOrServiceName
timeout = _timeout
@ -1242,8 +1262,6 @@ private[akka] case class RemoteActorRef private[akka] (
protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] = None
val remoteAddress: Option[InetSocketAddress] = Some(new InetSocketAddress(hostname, port))
// ==== NOT SUPPORTED ====
def actorClass: Class[_ <: Actor] = unsupported
def dispatcher_=(md: MessageDispatcher): Unit = unsupported
@ -1313,9 +1331,7 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
*/
def id: String
def id_=(id: String): Unit
/**
def id_=(id: String): Unit /**
* User overridable callback/setting.
* <p/>
* Defines the life-cycle for a supervised actor.
@ -1522,5 +1538,15 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
* Abstraction for unification of sender and senderFuture for later reply
*/
abstract class Channel[T] {
/**
* Sends the specified message to the channel
* Scala API
*/
def !(msg: T): Unit
/**
* Sends the specified message to the channel
* Java API
*/
def sendOneWay(msg: T): Unit = this.!(msg)
}

View file

@ -37,10 +37,10 @@ case class ActorUnregistered(actor: ActorRef) extends ActorRegistryEvent
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object ActorRegistry extends ListenerManagement {
private val actorsByUUID = new ConcurrentHashMap[Uuid, ActorRef]
private val actorsById = new Index[String,ActorRef]
private val remoteActorSets = Map[Address, RemoteActorSet]()
private val guard = new ReadWriteGuard
private val actorsByUUID = new ConcurrentHashMap[Uuid, ActorRef]
private val actorsById = new Index[String,ActorRef]
private val remoteActorSets = Map[Address, RemoteActorSet]()
private val guard = new ReadWriteGuard
/**
* Returns all actors in the system.
@ -50,7 +50,7 @@ object ActorRegistry extends ListenerManagement {
/**
* Returns the number of actors in the system.
*/
def size : Int = actorsByUUID.size
def size: Int = actorsByUUID.size
/**
* Invokes a function for all actors.
@ -68,8 +68,7 @@ object ActorRegistry extends ListenerManagement {
val elements = actorsByUUID.elements
while (elements.hasMoreElements) {
val element = elements.nextElement
if(f isDefinedAt element)
return Some(f(element))
if (f isDefinedAt element) return Some(f(element))
}
None
}
@ -88,9 +87,7 @@ object ActorRegistry extends ListenerManagement {
val elements = actorsByUUID.elements
while (elements.hasMoreElements) {
val actorId = elements.nextElement
if (p(actorId)) {
all += actorId
}
if (p(actorId)) all += actorId
}
all.toArray
}
@ -105,7 +102,7 @@ object ActorRegistry extends ListenerManagement {
* Finds any actor that matches T.
*/
def actorFor[T <: Actor](implicit manifest: Manifest[T]): Option[ActorRef] =
find({ case a:ActorRef if manifest.erasure.isAssignableFrom(a.actor.getClass) => a })
find({ case a: ActorRef if manifest.erasure.isAssignableFrom(a.actor.getClass) => a })
/**
* Finds all actors of type or sub-type specified by the class passed in as the Class argument.
@ -132,13 +129,11 @@ object ActorRegistry extends ListenerManagement {
* Invokes a function for all typed actors.
*/
def foreachTypedActor(f: (AnyRef) => Unit) = {
TypedActorModule.ensureTypedActorEnabled
TypedActorModule.ensureEnabled
val elements = actorsByUUID.elements
while (elements.hasMoreElements) {
val proxy = typedActorFor(elements.nextElement)
if (proxy.isDefined) {
f(proxy.get)
}
if (proxy.isDefined) f(proxy.get)
}
}
@ -147,12 +142,11 @@ object ActorRegistry extends ListenerManagement {
* Returns None if the function never returns Some
*/
def findTypedActor[T](f: PartialFunction[AnyRef,T]) : Option[T] = {
TypedActorModule.ensureTypedActorEnabled
TypedActorModule.ensureEnabled
val elements = actorsByUUID.elements
while (elements.hasMoreElements) {
val proxy = typedActorFor(elements.nextElement)
if(proxy.isDefined && (f isDefinedAt proxy))
return Some(f(proxy))
if (proxy.isDefined && (f isDefinedAt proxy)) return Some(f(proxy))
}
None
}
@ -161,14 +155,12 @@ object ActorRegistry extends ListenerManagement {
* Finds all typed actors that satisfy a predicate.
*/
def filterTypedActors(p: AnyRef => Boolean): Array[AnyRef] = {
TypedActorModule.ensureTypedActorEnabled
TypedActorModule.ensureEnabled
val all = new ListBuffer[AnyRef]
val elements = actorsByUUID.elements
while (elements.hasMoreElements) {
val proxy = typedActorFor(elements.nextElement)
if (proxy.isDefined && p(proxy.get)) {
all += proxy.get
}
if (proxy.isDefined && p(proxy.get)) all += proxy.get
}
all.toArray
}
@ -177,7 +169,7 @@ object ActorRegistry extends ListenerManagement {
* Finds all typed actors that are subtypes of the class passed in as the Manifest argument.
*/
def typedActorsFor[T <: AnyRef](implicit manifest: Manifest[T]): Array[AnyRef] = {
TypedActorModule.ensureTypedActorEnabled
TypedActorModule.ensureEnabled
typedActorsFor[T](manifest.erasure.asInstanceOf[Class[T]])
}
@ -185,20 +177,20 @@ object ActorRegistry extends ListenerManagement {
* Finds any typed actor that matches T.
*/
def typedActorFor[T <: AnyRef](implicit manifest: Manifest[T]): Option[AnyRef] = {
TypedActorModule.ensureTypedActorEnabled
def predicate(proxy: AnyRef) : Boolean = {
TypedActorModule.ensureEnabled
def predicate(proxy: AnyRef): Boolean = {
val actorRef = TypedActorModule.typedActorObjectInstance.get.actorFor(proxy)
actorRef.isDefined && manifest.erasure.isAssignableFrom(actorRef.get.actor.getClass)
}
findTypedActor({ case a:AnyRef if predicate(a) => a })
findTypedActor({ case a: Some[AnyRef] if predicate(a.get) => a })
}
/**
* Finds all typed actors of type or sub-type specified by the class passed in as the Class argument.
*/
def typedActorsFor[T <: AnyRef](clazz: Class[T]): Array[AnyRef] = {
TypedActorModule.ensureTypedActorEnabled
def predicate(proxy: AnyRef) : Boolean = {
TypedActorModule.ensureEnabled
def predicate(proxy: AnyRef): Boolean = {
val actorRef = TypedActorModule.typedActorObjectInstance.get.actorFor(proxy)
actorRef.isDefined && clazz.isAssignableFrom(actorRef.get.actor.getClass)
}
@ -209,7 +201,7 @@ object ActorRegistry extends ListenerManagement {
* Finds all typed actors that have a specific id.
*/
def typedActorsFor(id: String): Array[AnyRef] = {
TypedActorModule.ensureTypedActorEnabled
TypedActorModule.ensureEnabled
val actorRefs = actorsById values id
actorRefs.flatMap(typedActorFor(_))
}
@ -218,12 +210,10 @@ object ActorRegistry extends ListenerManagement {
* Finds the typed actor that has a specific UUID.
*/
def typedActorFor(uuid: Uuid): Option[AnyRef] = {
TypedActorModule.ensureTypedActorEnabled
TypedActorModule.ensureEnabled
val actorRef = actorsByUUID get uuid
if (actorRef eq null)
None
else
typedActorFor(actorRef)
if (actorRef eq null) None
else typedActorFor(actorRef)
}
/**
@ -265,20 +255,15 @@ object ActorRegistry extends ListenerManagement {
*/
def shutdownAll() {
log.info("Shutting down all actors in the system...")
if (TypedActorModule.isTypedActorEnabled) {
if (TypedActorModule.isEnabled) {
val elements = actorsByUUID.elements
while (elements.hasMoreElements) {
val actorRef = elements.nextElement
val proxy = typedActorFor(actorRef)
if (proxy.isDefined) {
TypedActorModule.typedActorObjectInstance.get.stop(proxy.get)
} else {
actorRef.stop
}
if (proxy.isDefined) TypedActorModule.typedActorObjectInstance.get.stop(proxy.get)
else actorRef.stop
}
} else {
foreach(_.stop)
}
} else foreach(_.stop)
actorsByUUID.clear
actorsById.clear
log.info("All actors have been shut down and unregistered from ActorRegistry")
@ -301,14 +286,18 @@ object ActorRegistry extends ListenerManagement {
private[akka] def actors(address: Address) = actorsFor(address).actors
private[akka] def actorsByUuid(address: Address) = actorsFor(address).actorsByUuid
private[akka] def actorsFactories(address: Address) = actorsFor(address).actorsFactories
private[akka] def typedActors(address: Address) = actorsFor(address).typedActors
private[akka] def typedActorsByUuid(address: Address) = actorsFor(address).typedActorsByUuid
private[akka] def typedActorsFactories(address: Address) = actorsFor(address).typedActorsFactories
private[akka] class RemoteActorSet {
private[ActorRegistry] val actors = new ConcurrentHashMap[String, ActorRef]
private[ActorRegistry] val actorsByUuid = new ConcurrentHashMap[String, ActorRef]
private[ActorRegistry] val actorsFactories = new ConcurrentHashMap[String, () => ActorRef]
private[ActorRegistry] val typedActors = new ConcurrentHashMap[String, AnyRef]
private[ActorRegistry] val typedActorsByUuid = new ConcurrentHashMap[String, AnyRef]
private[ActorRegistry] val typedActorsFactories = new ConcurrentHashMap[String, () => AnyRef]
}
}
@ -337,16 +326,13 @@ class Index[K <: AnyRef,V <: AnyRef : Manifest] {
if (set ne null) {
set.synchronized {
if (set.isEmpty) {
retry = true //IF the set is empty then it has been removed, so signal retry
}
if (set.isEmpty) retry = true //IF the set is empty then it has been removed, so signal retry
else { //Else add the value to the set and signal that retry is not needed
added = set add v
retry = false
}
}
}
else {
} else {
val newSet = new ConcurrentSkipListSet[V]
newSet add v
@ -354,24 +340,20 @@ class Index[K <: AnyRef,V <: AnyRef : Manifest] {
val oldSet = container.putIfAbsent(k,newSet)
if (oldSet ne null) {
oldSet.synchronized {
if (oldSet.isEmpty) {
retry = true //IF the set is empty then it has been removed, so signal retry
}
if (oldSet.isEmpty) retry = true //IF the set is empty then it has been removed, so signal retry
else { //Else try to add the value to the set and signal that retry is not needed
added = oldSet add v
retry = false
}
}
} else {
added = true
}
} else added = true
}
if (retry) spinPut(k,v)
if (retry) spinPut(k, v)
else added
}
spinPut(key,value)
spinPut(key, value)
}
/**
@ -390,10 +372,8 @@ class Index[K <: AnyRef,V <: AnyRef : Manifest] {
def findValue(key: K)(f: (V) => Boolean): Option[V] = {
import scala.collection.JavaConversions._
val set = container get key
if (set ne null)
set.iterator.find(f)
else
None
if (set ne null) set.iterator.find(f)
else None
}
/**
@ -420,8 +400,7 @@ class Index[K <: AnyRef,V <: AnyRef : Manifest] {
container.remove(key,emptySet) //We try to remove the key if it's mapped to an empty set
true //Remove succeeded
}
else false //Remove failed
} else false //Remove failed
}
} else false //Remove failed
}
@ -434,5 +413,5 @@ class Index[K <: AnyRef,V <: AnyRef : Manifest] {
/**
* Removes all keys and all values
*/
def clear = foreach { case (k,v) => remove(k,v) }
def clear = foreach { case (k, v) => remove(k, v) }
}

View file

@ -6,7 +6,7 @@ package akka.actor
import akka.dispatch._
import akka.config.Supervision._
import akka.japi.Procedure
import akka.japi.{Creator, Procedure}
import java.net.InetSocketAddress
@ -90,9 +90,7 @@ abstract class UntypedActor extends Actor {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait UntypedActorFactory {
def create: UntypedActor
}
trait UntypedActorFactory extends Creator[Actor]
/**
* Extend this abstract class to create a remote UntypedActor.

View file

@ -5,7 +5,7 @@
package akka.dispatch
import akka.actor.{ActorRef, IllegalActorStateException}
import akka.util.ReflectiveAccess.EnterpriseModule
import akka.util.ReflectiveAccess.AkkaCloudModule
import java.util.Queue
import akka.util.Switch
@ -123,10 +123,10 @@ class ExecutorBasedEventDrivenDispatcher(
*/
def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef = mailboxType match {
// FIXME make generic (work for TypedActor as well)
case FileBasedDurableMailbox(serializer) => EnterpriseModule.createFileBasedMailbox(actorRef).asInstanceOf[MessageQueue]
case ZooKeeperBasedDurableMailbox(serializer) => EnterpriseModule.createZooKeeperBasedMailbox(actorRef).asInstanceOf[MessageQueue]
case BeanstalkBasedDurableMailbox(serializer) => EnterpriseModule.createBeanstalkBasedMailbox(actorRef).asInstanceOf[MessageQueue]
case RedisBasedDurableMailbox(serializer) => EnterpriseModule.createRedisBasedMailbox(actorRef).asInstanceOf[MessageQueue]
case FileBasedDurableMailbox(serializer) => AkkaCloudModule.createFileBasedMailbox(actorRef).asInstanceOf[MessageQueue]
case ZooKeeperBasedDurableMailbox(serializer) => AkkaCloudModule.createZooKeeperBasedMailbox(actorRef).asInstanceOf[MessageQueue]
case BeanstalkBasedDurableMailbox(serializer) => AkkaCloudModule.createBeanstalkBasedMailbox(actorRef).asInstanceOf[MessageQueue]
case RedisBasedDurableMailbox(serializer) => AkkaCloudModule.createRedisBasedMailbox(actorRef).asInstanceOf[MessageQueue]
case AMQPBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("AMQPBasedDurableMailbox is not yet supported")
case JMSBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("JMSBasedDurableMailbox is not yet supported")
}

View file

@ -5,7 +5,7 @@
package akka.dispatch
import akka.actor.{Actor, ActorType, ActorRef, ActorInitializationException}
import akka.util.ReflectiveAccess.EnterpriseModule
import akka.util.ReflectiveAccess.AkkaCloudModule
import akka.AkkaException
import java.util.{Queue, List}
@ -42,15 +42,15 @@ case class BoundedMailbox(
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
}
abstract class DurableMailboxType(val serializer: EnterpriseModule.Serializer) extends MailboxType {
abstract class DurableMailboxType(val serializer: AkkaCloudModule.Serializer) extends MailboxType {
if (serializer eq null) throw new IllegalArgumentException("The serializer for DurableMailboxType can not be null")
}
case class FileBasedDurableMailbox(ser: EnterpriseModule.Serializer) extends DurableMailboxType(ser)
case class RedisBasedDurableMailbox(ser: EnterpriseModule.Serializer) extends DurableMailboxType(ser)
case class BeanstalkBasedDurableMailbox(ser: EnterpriseModule.Serializer) extends DurableMailboxType(ser)
case class ZooKeeperBasedDurableMailbox(ser: EnterpriseModule.Serializer) extends DurableMailboxType(ser)
case class AMQPBasedDurableMailbox(ser: EnterpriseModule.Serializer) extends DurableMailboxType(ser)
case class JMSBasedDurableMailbox(ser: EnterpriseModule.Serializer) extends DurableMailboxType(ser)
case class FileBasedDurableMailbox(ser: AkkaCloudModule.Serializer) extends DurableMailboxType(ser)
case class RedisBasedDurableMailbox(ser: AkkaCloudModule.Serializer) extends DurableMailboxType(ser)
case class BeanstalkBasedDurableMailbox(ser: AkkaCloudModule.Serializer) extends DurableMailboxType(ser)
case class ZooKeeperBasedDurableMailbox(ser: AkkaCloudModule.Serializer) extends DurableMailboxType(ser)
case class AMQPBasedDurableMailbox(ser: AkkaCloudModule.Serializer) extends DurableMailboxType(ser)
case class JMSBasedDurableMailbox(ser: AkkaCloudModule.Serializer) extends DurableMailboxType(ser)
class DefaultUnboundedMessageQueue(blockDequeue: Boolean)
extends LinkedBlockingQueue[MessageInvocation] with MessageQueue {

View file

@ -20,6 +20,14 @@ trait SideEffect {
def apply: Unit
}
/**
+ * A constructor/factory, takes no parameters but creates a new value of type T every call
+ */
trait Creator[T] {
def create: T
}
/**
* This class represents optional values. Instances of <code>Option</code>
* are either instances of case class <code>Some</code> or it is case

View file

@ -20,13 +20,13 @@ object ReflectiveAccess extends Logging {
val loader = getClass.getClassLoader
lazy val isRemotingEnabled = RemoteClientModule.isRemotingEnabled
lazy val isTypedActorEnabled = TypedActorModule.isTypedActorEnabled
lazy val isEnterpriseEnabled = EnterpriseModule.isEnterpriseEnabled
lazy val isRemotingEnabled = RemoteClientModule.isEnabled
lazy val isTypedActorEnabled = TypedActorModule.isEnabled
lazy val isAkkaCloudEnabled = AkkaCloudModule.isEnabled
def ensureRemotingEnabled = RemoteClientModule.ensureRemotingEnabled
def ensureTypedActorEnabled = TypedActorModule.ensureTypedActorEnabled
def ensureEnterpriseEnabled = EnterpriseModule.ensureEnterpriseEnabled
def ensureRemotingEnabled = RemoteClientModule.ensureEnabled
def ensureTypedActorEnabled = TypedActorModule.ensureEnabled
def ensureAkkaCloudEnabled = AkkaCloudModule.ensureEnabled
/**
* Reflective access to the RemoteClient module.
@ -56,32 +56,32 @@ object ReflectiveAccess extends Logging {
def clientFor(hostname: String, port: Int, loader: Option[ClassLoader]): RemoteClient
}
lazy val isRemotingEnabled = remoteClientObjectInstance.isDefined
lazy val isEnabled = remoteClientObjectInstance.isDefined
def ensureRemotingEnabled = if (!isRemotingEnabled) throw new ModuleNotAvailableException(
def ensureEnabled = if (!isEnabled) throw new ModuleNotAvailableException(
"Can't load the remoting module, make sure that akka-remote.jar is on the classpath")
val remoteClientObjectInstance: Option[RemoteClientObject] =
getObjectFor("akka.remote.RemoteClient$")
def register(address: InetSocketAddress, uuid: Uuid) = {
ensureRemotingEnabled
ensureEnabled
remoteClientObjectInstance.get.register(address.getHostName, address.getPort, uuid)
}
def unregister(address: InetSocketAddress, uuid: Uuid) = {
ensureRemotingEnabled
ensureEnabled
remoteClientObjectInstance.get.unregister(address.getHostName, address.getPort, uuid)
}
def registerSupervisorForActor(remoteAddress: InetSocketAddress, actorRef: ActorRef) = {
ensureRemotingEnabled
ensureEnabled
val remoteClient = remoteClientObjectInstance.get.clientFor(remoteAddress)
remoteClient.registerSupervisorForActor(actorRef)
}
def clientFor(hostname: String, port: Int, loader: Option[ClassLoader]): RemoteClient = {
ensureRemotingEnabled
ensureEnabled
remoteClientObjectInstance.get.clientFor(hostname, port, loader)
}
@ -95,7 +95,7 @@ object ReflectiveAccess extends Logging {
actorRef: ActorRef,
typedActorInfo: Option[Tuple2[String, String]],
actorType: ActorType): Option[CompletableFuture[T]] = {
ensureRemotingEnabled
ensureEnabled
clientFor(remoteAddress.getHostName, remoteAddress.getPort, None).send[T](
message, senderOption, senderFuture, remoteAddress, timeout, isOneWay, actorRef, typedActorInfo, actorType)
}
@ -126,17 +126,17 @@ object ReflectiveAccess extends Logging {
getObjectFor("akka.remote.RemoteNode$")
def registerActor(address: InetSocketAddress, actorRef: ActorRef) = {
ensureRemotingEnabled
RemoteClientModule.ensureEnabled
remoteServerObjectInstance.get.registerActor(address, actorRef)
}
def registerTypedActor(address: InetSocketAddress, implementationClassName: String, proxy: AnyRef) = {
ensureRemotingEnabled
RemoteClientModule.ensureEnabled
remoteServerObjectInstance.get.registerTypedActor(address, implementationClassName, proxy)
}
def unregister(actorRef: ActorRef) = {
ensureRemotingEnabled
RemoteClientModule.ensureEnabled
remoteNodeObjectInstance.get.unregister(actorRef)
}
}
@ -156,16 +156,16 @@ object ReflectiveAccess extends Logging {
def stop(anyRef: AnyRef) : Unit
}
lazy val isTypedActorEnabled = typedActorObjectInstance.isDefined
lazy val isEnabled = typedActorObjectInstance.isDefined
def ensureTypedActorEnabled = if (!isTypedActorEnabled) throw new ModuleNotAvailableException(
def ensureEnabled = if (!isTypedActorEnabled) throw new ModuleNotAvailableException(
"Can't load the typed actor module, make sure that akka-typed-actor.jar is on the classpath")
val typedActorObjectInstance: Option[TypedActorObject] =
getObjectFor("akka.actor.TypedActor$")
def resolveFutureIfMessageIsJoinPoint(message: Any, future: Future[_]): Boolean = {
ensureTypedActorEnabled
ensureEnabled
if (typedActorObjectInstance.get.isJoinPointAndOneWay(message)) {
future.asInstanceOf[CompletableFuture[Option[_]]].completeWithResult(None)
}
@ -173,7 +173,7 @@ object ReflectiveAccess extends Logging {
}
}
object EnterpriseModule {
object AkkaCloudModule {
type Mailbox = {
def enqueue(message: MessageInvocation)
@ -185,27 +185,27 @@ object ReflectiveAccess extends Logging {
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef
}
lazy val isEnterpriseEnabled = clusterObjectInstance.isDefined
lazy val isEnabled = clusterObjectInstance.isDefined
val clusterObjectInstance: Option[AnyRef] =
getObjectFor("akka.cluster.Cluster$")
getObjectFor("akka.cloud.cluster.Cluster$")
val serializerClass: Option[Class[_]] =
getClassFor("akka.serialization.Serializer")
def ensureEnterpriseEnabled = if (!isEnterpriseEnabled) throw new ModuleNotAvailableException(
"Feature is only available in Akka Enterprise edition")
def ensureEnabled = if (!isEnabled) throw new ModuleNotAvailableException(
"Feature is only available in Akka Cloud")
def createFileBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.actor.mailbox.FileBasedMailbox", actorRef)
def createFileBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.cloud.cluster.FileBasedMailbox", actorRef)
def createZooKeeperBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.actor.mailbox.ZooKeeperBasedMailbox", actorRef)
def createZooKeeperBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.cloud.cluster.ZooKeeperBasedMailbox", actorRef)
def createBeanstalkBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.actor.mailbox.BeanstalkBasedMailbox", actorRef)
def createBeanstalkBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.cloud.cluster.BeanstalkBasedMailbox", actorRef)
def createRedisBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.actor.mailbox.RedisBasedMailbox", actorRef)
def createRedisBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("akka.cloud.cluster.RedisBasedMailbox", actorRef)
private def createMailbox(mailboxClassname: String, actorRef: ActorRef): Mailbox = {
ensureEnterpriseEnabled
ensureEnabled
createInstance(
mailboxClassname,
Array(classOf[ActorRef]),

View file

@ -16,11 +16,11 @@ class HotSwapSpec extends WordSpec with MustMatchers {
val a = actorOf( new Actor {
def receive = { case _ => _log += "default" }
}).start
a ! HotSwap {
a ! HotSwap( self => {
case _ =>
_log += "swapped"
barrier.await
}
})
a ! "swapped"
barrier.await
_log must be ("swapped")
@ -71,11 +71,11 @@ class HotSwapSpec extends WordSpec with MustMatchers {
barrier.reset
_log = ""
a ! HotSwap {
a ! HotSwap(self => {
case "swapped" =>
_log += "swapped"
barrier.await
}
})
a ! "swapped"
barrier.await

View file

@ -41,7 +41,7 @@ class ReceiveTimeoutSpec extends JUnitSuite {
assert(timeoutLatch.tryAwait(3, TimeUnit.SECONDS))
val swappedLatch = new StandardLatch
timeoutActor ! HotSwap({
timeoutActor ! HotSwap(self => {
case ReceiveTimeout => swappedLatch.open
})

View file

@ -27,7 +27,6 @@ object ActorRegistrySpec {
self.reply("got ping")
}
}
}
class ActorRegistrySpec extends JUnitSuite {

Some files were not shown because too many files have changed in this diff Show more