introduce base64 random names
- remove Props.randomName and associated logic - ActorRefFactory contains AtomicLong which is used to generate unique children names - base64-like encoding is used with reverse “digit” order - disallow given names which are null, empty or start with ‘$’ - random names start have ‘$’ prepended (‘$’ not being one of the 64 characters) - special marker “$_” for tempPath until “/tmp” supervisor is introduced - TestActorRef uses globally unique “$$” prefix, as it creates actors beneath any supervisor as instructed by the user
This commit is contained in:
parent
5d85ab37e3
commit
a08234cd9a
10 changed files with 130 additions and 101 deletions
|
|
@ -4,16 +4,21 @@
|
||||||
|
|
||||||
package akka.actor
|
package akka.actor
|
||||||
|
|
||||||
import akka.config.ConfigurationException
|
import java.util.concurrent.atomic.AtomicLong
|
||||||
import akka.util.ReflectiveAccess
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
import akka.routing._
|
import java.util.concurrent.{ TimeUnit, Executors }
|
||||||
import com.eaio.uuid.UUID
|
|
||||||
import akka.AkkaException
|
|
||||||
import akka.dispatch._
|
|
||||||
import scala.annotation.tailrec
|
import scala.annotation.tailrec
|
||||||
import org.jboss.netty.akka.util.HashedWheelTimer
|
|
||||||
import java.util.concurrent.{ TimeUnit, Executors, ConcurrentHashMap }
|
import org.jboss.netty.akka.util.{ TimerTask, HashedWheelTimer }
|
||||||
import akka.event.{ LoggingAdapter, ActorClassification, DeathWatch, Logging }
|
|
||||||
|
import akka.actor.Timeout.intToTimeout
|
||||||
|
import akka.config.ConfigurationException
|
||||||
|
import akka.dispatch.{ SystemMessage, Supervise, Promise, MessageDispatcher, Future, DefaultPromise }
|
||||||
|
import akka.event.{ Logging, DeathWatch, ActorClassification }
|
||||||
|
import akka.routing.{ ScatterGatherFirstCompletedRouter, Routing, RouterType, Router, RoutedProps, RoutedActorRef, RoundRobinRouter, RandomRouter, LocalConnectionManager, DirectRouter }
|
||||||
|
import akka.util.Helpers
|
||||||
|
import akka.AkkaException
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Interface for all ActorRef providers to implement.
|
* Interface for all ActorRef providers to implement.
|
||||||
|
|
@ -50,6 +55,8 @@ trait ActorRefProvider {
|
||||||
private[akka] def terminationFuture: Future[ActorSystem.ExitStatus]
|
private[akka] def terminationFuture: Future[ActorSystem.ExitStatus]
|
||||||
|
|
||||||
private[akka] def dummyAskSender: ActorRef
|
private[akka] def dummyAskSender: ActorRef
|
||||||
|
|
||||||
|
private[akka] def tempPath: String
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -66,14 +73,25 @@ trait ActorRefFactory {
|
||||||
*/
|
*/
|
||||||
protected def guardian: ActorRef
|
protected def guardian: ActorRef
|
||||||
|
|
||||||
def actorOf(props: Props): ActorRef = actorOf(props, Props.randomName)
|
private val number = new AtomicLong
|
||||||
|
|
||||||
|
private def randomName: String = {
|
||||||
|
val l = number.getAndIncrement()
|
||||||
|
Helpers.base64(l)
|
||||||
|
}
|
||||||
|
|
||||||
|
def actorOf(props: Props): ActorRef = provider.actorOf(props, guardian, randomName, false)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* TODO this will have to go at some point, because creating two actors with
|
* TODO this will have to go at some point, because creating two actors with
|
||||||
* the same address can race on the cluster, and then you never know which
|
* the same address can race on the cluster, and then you never know which
|
||||||
* implementation wins
|
* implementation wins
|
||||||
*/
|
*/
|
||||||
def actorOf(props: Props, name: String): ActorRef = provider.actorOf(props, guardian, name, false)
|
def actorOf(props: Props, name: String): ActorRef = {
|
||||||
|
if (name == null || name == "" || name.startsWith("$"))
|
||||||
|
throw new ActorInitializationException("actor name must not be null, empty or start with $")
|
||||||
|
provider.actorOf(props, guardian, name, false)
|
||||||
|
}
|
||||||
|
|
||||||
def actorOf[T <: Actor](implicit m: Manifest[T]): ActorRef = actorOf(Props(m.erasure.asInstanceOf[Class[_ <: Actor]]))
|
def actorOf[T <: Actor](implicit m: Manifest[T]): ActorRef = actorOf(Props(m.erasure.asInstanceOf[Class[_ <: Actor]]))
|
||||||
|
|
||||||
|
|
@ -180,65 +198,55 @@ class LocalActorRefProvider(val app: ActorSystem) extends ActorRefProvider {
|
||||||
|
|
||||||
private[akka] def actorOf(props: Props, supervisor: ActorRef, path: ActorPath, systemService: Boolean): ActorRef = {
|
private[akka] def actorOf(props: Props, supervisor: ActorRef, path: ActorPath, systemService: Boolean): ActorRef = {
|
||||||
val name = path.name
|
val name = path.name
|
||||||
if ((name eq null) || name == Props.randomName) {
|
val newFuture = Promise[ActorRef](5000)(app.dispatcher) // FIXME is this proper timeout?
|
||||||
val randomName: String = newUuid.toString
|
|
||||||
val newPath = path.parent / randomName
|
|
||||||
val actor = new LocalActorRef(app, props, supervisor, newPath, systemService = true)
|
|
||||||
actors.putIfAbsent(newPath.toString, actor) match {
|
|
||||||
case null ⇒ actor
|
|
||||||
case other ⇒ throw new IllegalStateException("Same path generated twice for: " + actor + " and " + other)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
val newFuture = Promise[ActorRef](5000)(app.dispatcher) // FIXME is this proper timeout?
|
|
||||||
|
|
||||||
actors.putIfAbsent(path.toString, newFuture) match {
|
actors.putIfAbsent(path.toString, newFuture) match {
|
||||||
case null ⇒
|
case null ⇒
|
||||||
val actor: ActorRef = try {
|
val actor: ActorRef = try {
|
||||||
(if (systemService) None else deployer.lookupDeployment(path.toString)) match { // see if the deployment already exists, if so use it, if not create actor
|
(if (systemService) None else deployer.lookupDeployment(path.toString)) match { // see if the deployment already exists, if so use it, if not create actor
|
||||||
|
|
||||||
// create a local actor
|
// create a local actor
|
||||||
case None | Some(DeploymentConfig.Deploy(_, _, DeploymentConfig.Direct, _, DeploymentConfig.LocalScope)) ⇒
|
case None | Some(DeploymentConfig.Deploy(_, _, DeploymentConfig.Direct, _, DeploymentConfig.LocalScope)) ⇒
|
||||||
new LocalActorRef(app, props, supervisor, path, systemService) // create a local actor
|
new LocalActorRef(app, props, supervisor, path, systemService) // create a local actor
|
||||||
|
|
||||||
// create a routed actor ref
|
// create a routed actor ref
|
||||||
case deploy @ Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, DeploymentConfig.LocalScope)) ⇒
|
case deploy @ Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, DeploymentConfig.LocalScope)) ⇒
|
||||||
|
|
||||||
val routerFactory: () ⇒ Router = DeploymentConfig.routerTypeFor(routerType) match {
|
val routerFactory: () ⇒ Router = DeploymentConfig.routerTypeFor(routerType) match {
|
||||||
case RouterType.Direct ⇒ () ⇒ new DirectRouter
|
case RouterType.Direct ⇒ () ⇒ new DirectRouter
|
||||||
case RouterType.Random ⇒ () ⇒ new RandomRouter
|
case RouterType.Random ⇒ () ⇒ new RandomRouter
|
||||||
case RouterType.RoundRobin ⇒ () ⇒ new RoundRobinRouter
|
case RouterType.RoundRobin ⇒ () ⇒ new RoundRobinRouter
|
||||||
case RouterType.ScatterGather ⇒ () ⇒ new ScatterGatherFirstCompletedRouter()(
|
case RouterType.ScatterGather ⇒ () ⇒ new ScatterGatherFirstCompletedRouter()(
|
||||||
if (props.dispatcher == Props.defaultDispatcher) app.dispatcher else props.dispatcher, app.AkkaConfig.ActorTimeout)
|
if (props.dispatcher == Props.defaultDispatcher) app.dispatcher else props.dispatcher, app.AkkaConfig.ActorTimeout)
|
||||||
case RouterType.LeastCPU ⇒ sys.error("Router LeastCPU not supported yet")
|
case RouterType.LeastCPU ⇒ sys.error("Router LeastCPU not supported yet")
|
||||||
case RouterType.LeastRAM ⇒ sys.error("Router LeastRAM not supported yet")
|
case RouterType.LeastRAM ⇒ sys.error("Router LeastRAM not supported yet")
|
||||||
case RouterType.LeastMessages ⇒ sys.error("Router LeastMessages not supported yet")
|
case RouterType.LeastMessages ⇒ sys.error("Router LeastMessages not supported yet")
|
||||||
case RouterType.Custom(implClass) ⇒ () ⇒ Routing.createCustomRouter(implClass)
|
case RouterType.Custom(implClass) ⇒ () ⇒ Routing.createCustomRouter(implClass)
|
||||||
}
|
}
|
||||||
|
|
||||||
val connections: Iterable[ActorRef] = (1 to nrOfInstances.factor) map { i ⇒
|
val connections: Iterable[ActorRef] = (1 to nrOfInstances.factor) map { i ⇒
|
||||||
val routedPath = path.parent / (path.name + ":" + i)
|
val routedPath = path.parent / (path.name + ":" + i)
|
||||||
new LocalActorRef(app, props, supervisor, routedPath, systemService)
|
new LocalActorRef(app, props, supervisor, routedPath, systemService)
|
||||||
}
|
}
|
||||||
|
|
||||||
actorOf(RoutedProps(routerFactory = routerFactory, connectionManager = new LocalConnectionManager(connections)), supervisor, path.toString)
|
actorOf(RoutedProps(routerFactory = routerFactory, connectionManager = new LocalConnectionManager(connections)), supervisor, path.toString)
|
||||||
|
|
||||||
case unknown ⇒ throw new Exception("Don't know how to create this actor ref! Why? Got: " + unknown)
|
case unknown ⇒ throw new Exception("Don't know how to create this actor ref! Why? Got: " + unknown)
|
||||||
}
|
|
||||||
} catch {
|
|
||||||
case e: Exception ⇒
|
|
||||||
newFuture completeWithException e // so the other threads gets notified of error
|
|
||||||
//TODO FIXME should we remove the mapping in "actors" here?
|
|
||||||
throw e
|
|
||||||
}
|
}
|
||||||
|
} catch {
|
||||||
|
case e: Exception ⇒
|
||||||
|
newFuture completeWithException e // so the other threads gets notified of error
|
||||||
|
//TODO FIXME should we remove the mapping in "actors" here?
|
||||||
|
throw e
|
||||||
|
}
|
||||||
|
|
||||||
newFuture completeWithResult actor
|
newFuture completeWithResult actor
|
||||||
actors.replace(path.toString, newFuture, actor)
|
actors.replace(path.toString, newFuture, actor)
|
||||||
actor
|
actor
|
||||||
case actor: ActorRef ⇒
|
case actor: ActorRef ⇒
|
||||||
actor
|
actor
|
||||||
case future: Future[_] ⇒
|
case future: Future[_] ⇒
|
||||||
future.get.asInstanceOf[ActorRef]
|
future.get.asInstanceOf[ActorRef]
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
@ -280,6 +288,12 @@ class LocalActorRefProvider(val app: ActorSystem) extends ActorRefProvider {
|
||||||
}
|
}
|
||||||
|
|
||||||
private[akka] val dummyAskSender = new DeadLetterActorRef(app)
|
private[akka] val dummyAskSender = new DeadLetterActorRef(app)
|
||||||
|
|
||||||
|
private val tempNumber = new AtomicLong
|
||||||
|
def tempPath = {
|
||||||
|
val l = tempNumber.getAndIncrement()
|
||||||
|
"$_" + Helpers.base64(l)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class LocalDeathWatch extends DeathWatch with ActorClassification {
|
class LocalDeathWatch extends DeathWatch with ActorClassification {
|
||||||
|
|
@ -299,7 +313,6 @@ class LocalDeathWatch extends DeathWatch with ActorClassification {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
import org.jboss.netty.akka.util.{ HashedWheelTimer, TimerTask }
|
|
||||||
class DefaultScheduler(hashedWheelTimer: HashedWheelTimer) extends Scheduler {
|
class DefaultScheduler(hashedWheelTimer: HashedWheelTimer) extends Scheduler {
|
||||||
|
|
||||||
def schedule(receiver: ActorRef, message: Any, initialDelay: Long, delay: Long, timeUnit: TimeUnit): Cancellable =
|
def schedule(receiver: ActorRef, message: Any, initialDelay: Long, delay: Long, timeUnit: TimeUnit): Cancellable =
|
||||||
|
|
|
||||||
|
|
@ -22,8 +22,9 @@ trait ActorDeployer {
|
||||||
private[akka] def deploy(deployment: Deploy): Unit
|
private[akka] def deploy(deployment: Deploy): Unit
|
||||||
private[akka] def lookupDeploymentFor(path: String): Option[Deploy]
|
private[akka] def lookupDeploymentFor(path: String): Option[Deploy]
|
||||||
def lookupDeployment(path: String): Option[Deploy] = path match {
|
def lookupDeployment(path: String): Option[Deploy] = path match {
|
||||||
case null | Props.`randomName` ⇒ None
|
case null | "" ⇒ None
|
||||||
case some ⇒ lookupDeploymentFor(some)
|
case s if s.startsWith("$") ⇒ None
|
||||||
|
case some ⇒ lookupDeploymentFor(some)
|
||||||
}
|
}
|
||||||
private[akka] def deploy(deployment: Seq[Deploy]): Unit = deployment foreach (deploy(_))
|
private[akka] def deploy(deployment: Seq[Deploy]): Unit = deployment foreach (deploy(_))
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -30,7 +30,6 @@ object Props {
|
||||||
final val defaultFaultHandler: FaultHandlingStrategy = OneForOneStrategy(defaultDecider, None, None)
|
final val defaultFaultHandler: FaultHandlingStrategy = OneForOneStrategy(defaultDecider, None, None)
|
||||||
|
|
||||||
final val noHotSwap: Stack[Actor.Receive] = Stack.empty
|
final val noHotSwap: Stack[Actor.Receive] = Stack.empty
|
||||||
final val randomName: String = ""
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The default Props instance, uses the settings from the Props object starting with default*
|
* The default Props instance, uses the settings from the Props object starting with default*
|
||||||
|
|
|
||||||
|
|
@ -129,7 +129,7 @@ trait TypedActorFactory { this: ActorRefFactory ⇒
|
||||||
* all interfaces (Class.getInterfaces) if it's not an interface class
|
* all interfaces (Class.getInterfaces) if it's not an interface class
|
||||||
*/
|
*/
|
||||||
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props): R =
|
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props): R =
|
||||||
typedActor.createProxyAndTypedActor(this, interface, impl.newInstance, props, Props.randomName, interface.getClassLoader)
|
typedActor.createProxyAndTypedActor(this, interface, impl.newInstance, props, None, interface.getClassLoader)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new TypedActor proxy using the supplied Props,
|
* Creates a new TypedActor proxy using the supplied Props,
|
||||||
|
|
@ -137,7 +137,7 @@ trait TypedActorFactory { this: ActorRefFactory ⇒
|
||||||
* all interfaces (Class.getInterfaces) if it's not an interface class
|
* all interfaces (Class.getInterfaces) if it's not an interface class
|
||||||
*/
|
*/
|
||||||
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props, name: String): R =
|
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props, name: String): R =
|
||||||
typedActor.createProxyAndTypedActor(this, interface, impl.newInstance, props, name, interface.getClassLoader)
|
typedActor.createProxyAndTypedActor(this, interface, impl.newInstance, props, Some(name), interface.getClassLoader)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new TypedActor proxy using the supplied Props,
|
* Creates a new TypedActor proxy using the supplied Props,
|
||||||
|
|
@ -145,7 +145,7 @@ trait TypedActorFactory { this: ActorRefFactory ⇒
|
||||||
* all interfaces (Class.getInterfaces) if it's not an interface class
|
* all interfaces (Class.getInterfaces) if it's not an interface class
|
||||||
*/
|
*/
|
||||||
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props): R =
|
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props): R =
|
||||||
typedActor.createProxyAndTypedActor(this, interface, impl.create, props, Props.randomName, interface.getClassLoader)
|
typedActor.createProxyAndTypedActor(this, interface, impl.create, props, None, interface.getClassLoader)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new TypedActor proxy using the supplied Props,
|
* Creates a new TypedActor proxy using the supplied Props,
|
||||||
|
|
@ -153,7 +153,7 @@ trait TypedActorFactory { this: ActorRefFactory ⇒
|
||||||
* all interfaces (Class.getInterfaces) if it's not an interface class
|
* all interfaces (Class.getInterfaces) if it's not an interface class
|
||||||
*/
|
*/
|
||||||
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props, name: String): R =
|
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props, name: String): R =
|
||||||
typedActor.createProxyAndTypedActor(this, interface, impl.create, props, name, interface.getClassLoader)
|
typedActor.createProxyAndTypedActor(this, interface, impl.create, props, Some(name), interface.getClassLoader)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new TypedActor proxy using the supplied Props,
|
* Creates a new TypedActor proxy using the supplied Props,
|
||||||
|
|
@ -161,7 +161,7 @@ trait TypedActorFactory { this: ActorRefFactory ⇒
|
||||||
* all interfaces (Class.getInterfaces) if it's not an interface class
|
* all interfaces (Class.getInterfaces) if it's not an interface class
|
||||||
*/
|
*/
|
||||||
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props, loader: ClassLoader): R =
|
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props, loader: ClassLoader): R =
|
||||||
typedActor.createProxyAndTypedActor(this, interface, impl.newInstance, props, Props.randomName, loader)
|
typedActor.createProxyAndTypedActor(this, interface, impl.newInstance, props, None, loader)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new TypedActor proxy using the supplied Props,
|
* Creates a new TypedActor proxy using the supplied Props,
|
||||||
|
|
@ -169,7 +169,7 @@ trait TypedActorFactory { this: ActorRefFactory ⇒
|
||||||
* all interfaces (Class.getInterfaces) if it's not an interface class
|
* all interfaces (Class.getInterfaces) if it's not an interface class
|
||||||
*/
|
*/
|
||||||
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props, name: String, loader: ClassLoader): R =
|
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props, name: String, loader: ClassLoader): R =
|
||||||
typedActor.createProxyAndTypedActor(this, interface, impl.newInstance, props, name, loader)
|
typedActor.createProxyAndTypedActor(this, interface, impl.newInstance, props, Some(name), loader)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new TypedActor proxy using the supplied Props,
|
* Creates a new TypedActor proxy using the supplied Props,
|
||||||
|
|
@ -177,7 +177,7 @@ trait TypedActorFactory { this: ActorRefFactory ⇒
|
||||||
* all interfaces (Class.getInterfaces) if it's not an interface class
|
* all interfaces (Class.getInterfaces) if it's not an interface class
|
||||||
*/
|
*/
|
||||||
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props, loader: ClassLoader): R =
|
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props, loader: ClassLoader): R =
|
||||||
typedActor.createProxyAndTypedActor(this, interface, impl.create, props, Props.randomName, loader)
|
typedActor.createProxyAndTypedActor(this, interface, impl.create, props, None, loader)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new TypedActor proxy using the supplied Props,
|
* Creates a new TypedActor proxy using the supplied Props,
|
||||||
|
|
@ -185,65 +185,65 @@ trait TypedActorFactory { this: ActorRefFactory ⇒
|
||||||
* all interfaces (Class.getInterfaces) if it's not an interface class
|
* all interfaces (Class.getInterfaces) if it's not an interface class
|
||||||
*/
|
*/
|
||||||
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props, name: String, loader: ClassLoader): R =
|
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props, name: String, loader: ClassLoader): R =
|
||||||
typedActor.createProxyAndTypedActor(this, interface, impl.create, props, name, loader)
|
typedActor.createProxyAndTypedActor(this, interface, impl.create, props, Some(name), loader)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new TypedActor proxy using the supplied Props,
|
* Creates a new TypedActor proxy using the supplied Props,
|
||||||
* the interfaces usable by the returned proxy is the supplied implementation class' interfaces (Class.getInterfaces)
|
* the interfaces usable by the returned proxy is the supplied implementation class' interfaces (Class.getInterfaces)
|
||||||
*/
|
*/
|
||||||
def typedActorOf[R <: AnyRef, T <: R](impl: Class[T], props: Props, loader: ClassLoader): R =
|
def typedActorOf[R <: AnyRef, T <: R](impl: Class[T], props: Props, loader: ClassLoader): R =
|
||||||
typedActor.createProxyAndTypedActor(this, impl, impl.newInstance, props, Props.randomName, loader)
|
typedActor.createProxyAndTypedActor(this, impl, impl.newInstance, props, None, loader)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new TypedActor proxy using the supplied Props,
|
* Creates a new TypedActor proxy using the supplied Props,
|
||||||
* the interfaces usable by the returned proxy is the supplied implementation class' interfaces (Class.getInterfaces)
|
* the interfaces usable by the returned proxy is the supplied implementation class' interfaces (Class.getInterfaces)
|
||||||
*/
|
*/
|
||||||
def typedActorOf[R <: AnyRef, T <: R](impl: Class[T], props: Props, name: String, loader: ClassLoader): R =
|
def typedActorOf[R <: AnyRef, T <: R](impl: Class[T], props: Props, name: String, loader: ClassLoader): R =
|
||||||
typedActor.createProxyAndTypedActor(this, impl, impl.newInstance, props, name, loader)
|
typedActor.createProxyAndTypedActor(this, impl, impl.newInstance, props, Some(name), loader)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new TypedActor proxy using the supplied Props,
|
* Creates a new TypedActor proxy using the supplied Props,
|
||||||
* the interfaces usable by the returned proxy is the supplied implementation class' interfaces (Class.getInterfaces)
|
* the interfaces usable by the returned proxy is the supplied implementation class' interfaces (Class.getInterfaces)
|
||||||
*/
|
*/
|
||||||
def typedActorOf[R <: AnyRef, T <: R](props: Props = Props(), name: String = Props.randomName, loader: ClassLoader = null)(implicit m: Manifest[T]): R = {
|
def typedActorOf[R <: AnyRef, T <: R](props: Props = Props(), name: String = null, loader: ClassLoader = null)(implicit m: Manifest[T]): R = {
|
||||||
val clazz = m.erasure.asInstanceOf[Class[T]]
|
val clazz = m.erasure.asInstanceOf[Class[T]]
|
||||||
typedActor.createProxyAndTypedActor(this, clazz, clazz.newInstance, props, name, if (loader eq null) clazz.getClassLoader else loader)
|
typedActor.createProxyAndTypedActor(this, clazz, clazz.newInstance, props, Option(name), if (loader eq null) clazz.getClassLoader else loader)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself,
|
* Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself,
|
||||||
* to create TypedActor proxies, use typedActorOf
|
* to create TypedActor proxies, use typedActorOf
|
||||||
*/
|
*/
|
||||||
def createProxy[R <: AnyRef](constructor: ⇒ Actor, props: Props = Props(), name: String = Props.randomName, loader: ClassLoader = null)(implicit m: Manifest[R]): R =
|
def createProxy[R <: AnyRef](constructor: ⇒ Actor, props: Props = Props(), name: String = null, loader: ClassLoader = null)(implicit m: Manifest[R]): R =
|
||||||
typedActor.createProxy[R](this, typedActor.extractInterfaces(m.erasure), (ref: AtomVar[R]) ⇒ constructor, props, Props.randomName, if (loader eq null) m.erasure.getClassLoader else loader)
|
typedActor.createProxy[R](this, typedActor.extractInterfaces(m.erasure), (ref: AtomVar[R]) ⇒ constructor, props, Option(name), if (loader eq null) m.erasure.getClassLoader else loader)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself,
|
* Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself,
|
||||||
* to create TypedActor proxies, use typedActorOf
|
* to create TypedActor proxies, use typedActorOf
|
||||||
*/
|
*/
|
||||||
def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Creator[Actor], props: Props, loader: ClassLoader): R =
|
def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Creator[Actor], props: Props, loader: ClassLoader): R =
|
||||||
typedActor.createProxy(this, interfaces, (ref: AtomVar[R]) ⇒ constructor.create, props, Props.randomName, loader)
|
typedActor.createProxy(this, interfaces, (ref: AtomVar[R]) ⇒ constructor.create, props, None, loader)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself,
|
* Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself,
|
||||||
* to create TypedActor proxies, use typedActorOf
|
* to create TypedActor proxies, use typedActorOf
|
||||||
*/
|
*/
|
||||||
def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Creator[Actor], props: Props, name: String, loader: ClassLoader): R =
|
def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Creator[Actor], props: Props, name: String, loader: ClassLoader): R =
|
||||||
typedActor.createProxy(this, interfaces, (ref: AtomVar[R]) ⇒ constructor.create, props, name, loader)
|
typedActor.createProxy(this, interfaces, (ref: AtomVar[R]) ⇒ constructor.create, props, Some(name), loader)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself,
|
* Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself,
|
||||||
* to create TypedActor proxies, use typedActorOf
|
* to create TypedActor proxies, use typedActorOf
|
||||||
*/
|
*/
|
||||||
def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: ⇒ Actor, props: Props, loader: ClassLoader): R =
|
def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: ⇒ Actor, props: Props, loader: ClassLoader): R =
|
||||||
typedActor.createProxy[R](this, interfaces, (ref: AtomVar[R]) ⇒ constructor, props, Props.randomName, loader)
|
typedActor.createProxy[R](this, interfaces, (ref: AtomVar[R]) ⇒ constructor, props, None, loader)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself,
|
* Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself,
|
||||||
* to create TypedActor proxies, use typedActorOf
|
* to create TypedActor proxies, use typedActorOf
|
||||||
*/
|
*/
|
||||||
def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: ⇒ Actor, props: Props, name: String, loader: ClassLoader): R =
|
def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: ⇒ Actor, props: Props, name: String, loader: ClassLoader): R =
|
||||||
typedActor.createProxy[R](this, interfaces, (ref: AtomVar[R]) ⇒ constructor, props, name, loader)
|
typedActor.createProxy[R](this, interfaces, (ref: AtomVar[R]) ⇒ constructor, props, Some(name), loader)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -301,15 +301,15 @@ class TypedActor(val app: ActorSystem) {
|
||||||
}
|
}
|
||||||
else null
|
else null
|
||||||
|
|
||||||
private[akka] def createProxy[R <: AnyRef](supervisor: ActorRefFactory, interfaces: Array[Class[_]], constructor: (AtomVar[R]) ⇒ Actor, props: Props, name: String, loader: ClassLoader): R = {
|
private[akka] def createProxy[R <: AnyRef](supervisor: ActorRefFactory, interfaces: Array[Class[_]], constructor: (AtomVar[R]) ⇒ Actor, props: Props, name: Option[String], loader: ClassLoader): R = {
|
||||||
val proxyVar = new AtomVar[R]
|
val proxyVar = new AtomVar[R]
|
||||||
configureAndProxyLocalActorRef[R](supervisor, interfaces, proxyVar, props.withCreator(constructor(proxyVar)), name, loader)
|
configureAndProxyLocalActorRef[R](supervisor, interfaces, proxyVar, props.withCreator(constructor(proxyVar)), name, loader)
|
||||||
}
|
}
|
||||||
|
|
||||||
private[akka] def createProxyAndTypedActor[R <: AnyRef, T <: R](supervisor: ActorRefFactory, interface: Class[_], constructor: ⇒ T, props: Props, name: String, loader: ClassLoader): R =
|
private[akka] def createProxyAndTypedActor[R <: AnyRef, T <: R](supervisor: ActorRefFactory, interface: Class[_], constructor: ⇒ T, props: Props, name: Option[String], loader: ClassLoader): R =
|
||||||
createProxy[R](supervisor, extractInterfaces(interface), (ref: AtomVar[R]) ⇒ new TypedActor[R, T](ref, constructor), props, name, loader)
|
createProxy[R](supervisor, extractInterfaces(interface), (ref: AtomVar[R]) ⇒ new TypedActor[R, T](ref, constructor), props, name, loader)
|
||||||
|
|
||||||
private[akka] def configureAndProxyLocalActorRef[T <: AnyRef](supervisor: ActorRefFactory, interfaces: Array[Class[_]], proxyVar: AtomVar[T], props: Props, name: String, loader: ClassLoader): T = {
|
private[akka] def configureAndProxyLocalActorRef[T <: AnyRef](supervisor: ActorRefFactory, interfaces: Array[Class[_]], proxyVar: AtomVar[T], props: Props, name: Option[String], loader: ClassLoader): T = {
|
||||||
//Warning, do not change order of the following statements, it's some elaborate chicken-n-egg handling
|
//Warning, do not change order of the following statements, it's some elaborate chicken-n-egg handling
|
||||||
val actorVar = new AtomVar[ActorRef](null)
|
val actorVar = new AtomVar[ActorRef](null)
|
||||||
val timeout = props.timeout match {
|
val timeout = props.timeout match {
|
||||||
|
|
@ -318,7 +318,7 @@ class TypedActor(val app: ActorSystem) {
|
||||||
}
|
}
|
||||||
val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(actorVar, timeout)).asInstanceOf[T]
|
val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(actorVar, timeout)).asInstanceOf[T]
|
||||||
proxyVar.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive
|
proxyVar.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive
|
||||||
val ref = supervisor.actorOf(props, name)
|
val ref = if (name.isDefined) supervisor.actorOf(props, name.get) else supervisor.actorOf(props)
|
||||||
actorVar.set(ref) //Make sure the InvocationHandler gets ahold of the actor reference, this is not a problem since the proxy hasn't escaped this method yet
|
actorVar.set(ref) //Make sure the InvocationHandler gets ahold of the actor reference, this is not a problem since the proxy hasn't escaped this method yet
|
||||||
proxyVar.get
|
proxyVar.get
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@ package akka.util
|
||||||
|
|
||||||
import java.io.{ PrintWriter, StringWriter }
|
import java.io.{ PrintWriter, StringWriter }
|
||||||
import java.util.Comparator
|
import java.util.Comparator
|
||||||
|
import scala.annotation.tailrec
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||||
|
|
@ -40,6 +41,16 @@ object Helpers {
|
||||||
(0 until 4).foldLeft(0)((value, index) ⇒ value + ((bytes(index + offset) & 0x000000FF) << ((4 - 1 - index) * 8)))
|
(0 until 4).foldLeft(0)((value, index) ⇒ value + ((bytes(index + offset) & 0x000000FF) << ((4 - 1 - index) * 8)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final val base64chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789*?"
|
||||||
|
|
||||||
|
@tailrec
|
||||||
|
def base64(l: Long, sb: StringBuilder = new StringBuilder("$")): String = {
|
||||||
|
sb += base64chars.charAt(l.toInt & 63)
|
||||||
|
val next = l >>> 6
|
||||||
|
if (next == 0) sb.toString
|
||||||
|
else base64(next, sb)
|
||||||
|
}
|
||||||
|
|
||||||
def ignore[E: Manifest](body: ⇒ Unit) {
|
def ignore[E: Manifest](body: ⇒ Unit) {
|
||||||
try {
|
try {
|
||||||
body
|
body
|
||||||
|
|
|
||||||
|
|
@ -65,7 +65,7 @@ class NetworkEventStream(val app: ActorSystem) {
|
||||||
// FIXME: check that this supervision is correct
|
// FIXME: check that this supervision is correct
|
||||||
private[akka] val sender = app.provider.actorOf(
|
private[akka] val sender = app.provider.actorOf(
|
||||||
Props[Channel].copy(dispatcher = app.dispatcherFactory.newPinnedDispatcher("NetworkEventStream")),
|
Props[Channel].copy(dispatcher = app.dispatcherFactory.newPinnedDispatcher("NetworkEventStream")),
|
||||||
app.guardian, Props.randomName, systemService = true)
|
app.systemGuardian, "network-event-sender", systemService = true)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Registers a network event stream listener (asyncronously).
|
* Registers a network event stream listener (asyncronously).
|
||||||
|
|
|
||||||
|
|
@ -188,7 +188,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
|
||||||
Props(
|
Props(
|
||||||
context ⇒ {
|
context ⇒ {
|
||||||
case f: Function0[_] ⇒ try { f() } finally { context.self.stop() }
|
case f: Function0[_] ⇒ try { f() } finally { context.self.stop() }
|
||||||
}).copy(dispatcher = computeGridDispatcher), app.guardian, app.guardian.path / Props.randomName, systemService = true) ! payloadFor(message, classOf[Function0[Unit]])
|
}).copy(dispatcher = computeGridDispatcher), app.guardian, app.guardian.path / app.provider.tempPath, systemService = true) ! payloadFor(message, classOf[Function0[Unit]])
|
||||||
}
|
}
|
||||||
|
|
||||||
// FIXME: handle real remote supervision
|
// FIXME: handle real remote supervision
|
||||||
|
|
@ -197,7 +197,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
|
||||||
Props(
|
Props(
|
||||||
context ⇒ {
|
context ⇒ {
|
||||||
case f: Function0[_] ⇒ try { sender ! f() } finally { context.self.stop() }
|
case f: Function0[_] ⇒ try { sender ! f() } finally { context.self.stop() }
|
||||||
}).copy(dispatcher = computeGridDispatcher), app.guardian, app.guardian.path / Props.randomName, systemService = true) forward payloadFor(message, classOf[Function0[Any]])
|
}).copy(dispatcher = computeGridDispatcher), app.guardian, app.guardian.path / app.provider.tempPath, systemService = true) forward payloadFor(message, classOf[Function0[Any]])
|
||||||
}
|
}
|
||||||
|
|
||||||
// FIXME: handle real remote supervision
|
// FIXME: handle real remote supervision
|
||||||
|
|
@ -206,7 +206,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
|
||||||
Props(
|
Props(
|
||||||
context ⇒ {
|
context ⇒ {
|
||||||
case (fun: Function[_, _], param: Any) ⇒ try { fun.asInstanceOf[Any ⇒ Unit].apply(param) } finally { context.self.stop() }
|
case (fun: Function[_, _], param: Any) ⇒ try { fun.asInstanceOf[Any ⇒ Unit].apply(param) } finally { context.self.stop() }
|
||||||
}).copy(dispatcher = computeGridDispatcher), app.guardian, app.guardian.path / Props.randomName, systemService = true) ! payloadFor(message, classOf[Tuple2[Function1[Any, Unit], Any]])
|
}).copy(dispatcher = computeGridDispatcher), app.guardian, app.guardian.path / app.provider.tempPath, systemService = true) ! payloadFor(message, classOf[Tuple2[Function1[Any, Unit], Any]])
|
||||||
}
|
}
|
||||||
|
|
||||||
// FIXME: handle real remote supervision
|
// FIXME: handle real remote supervision
|
||||||
|
|
@ -215,7 +215,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
|
||||||
Props(
|
Props(
|
||||||
context ⇒ {
|
context ⇒ {
|
||||||
case (fun: Function[_, _], param: Any) ⇒ try { sender ! fun.asInstanceOf[Any ⇒ Any](param) } finally { context.self.stop() }
|
case (fun: Function[_, _], param: Any) ⇒ try { sender ! fun.asInstanceOf[Any ⇒ Any](param) } finally { context.self.stop() }
|
||||||
}).copy(dispatcher = computeGridDispatcher), app.guardian, app.guardian.path / Props.randomName, systemService = true) forward payloadFor(message, classOf[Tuple2[Function1[Any, Any], Any]])
|
}).copy(dispatcher = computeGridDispatcher), app.guardian, app.guardian.path / app.provider.tempPath, systemService = true) forward payloadFor(message, classOf[Tuple2[Function1[Any, Any], Any]])
|
||||||
}
|
}
|
||||||
|
|
||||||
def handleFailover(message: RemoteSystemDaemonMessageProtocol) {
|
def handleFailover(message: RemoteSystemDaemonMessageProtocol) {
|
||||||
|
|
|
||||||
|
|
@ -243,6 +243,8 @@ class RemoteActorRefProvider(val app: ActorSystem) extends ActorRefProvider {
|
||||||
private[akka] def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = local.ask(message, recipient, within)
|
private[akka] def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = local.ask(message, recipient, within)
|
||||||
|
|
||||||
private[akka] def dummyAskSender = local.dummyAskSender
|
private[akka] def dummyAskSender = local.dummyAskSender
|
||||||
|
|
||||||
|
private[akka] def tempPath = local.tempPath
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -9,6 +9,7 @@ import akka.util.ReflectiveAccess
|
||||||
import com.eaio.uuid.UUID
|
import com.eaio.uuid.UUID
|
||||||
import akka.actor.Props._
|
import akka.actor.Props._
|
||||||
import akka.actor.ActorSystem
|
import akka.actor.ActorSystem
|
||||||
|
import java.util.concurrent.atomic.AtomicLong
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This special ActorRef is exclusively for use during unit testing in a single-threaded environment. Therefore, it
|
* This special ActorRef is exclusively for use during unit testing in a single-threaded environment. Therefore, it
|
||||||
|
|
@ -41,23 +42,25 @@ class TestActorRef[T <: Actor](_app: ActorSystem, _props: Props, _supervisor: Ac
|
||||||
|
|
||||||
object TestActorRef {
|
object TestActorRef {
|
||||||
|
|
||||||
def apply[T <: Actor](factory: ⇒ T)(implicit app: ActorSystem): TestActorRef[T] = apply[T](Props(factory), Props.randomName)
|
private val number = new AtomicLong
|
||||||
|
private[testkit] def randomName: String = {
|
||||||
|
val l = number.getAndIncrement()
|
||||||
|
"$" + akka.util.Helpers.base64(l)
|
||||||
|
}
|
||||||
|
|
||||||
|
def apply[T <: Actor](factory: ⇒ T)(implicit app: ActorSystem): TestActorRef[T] = apply[T](Props(factory), randomName)
|
||||||
|
|
||||||
def apply[T <: Actor](factory: ⇒ T, name: String)(implicit app: ActorSystem): TestActorRef[T] = apply[T](Props(factory), name)
|
def apply[T <: Actor](factory: ⇒ T, name: String)(implicit app: ActorSystem): TestActorRef[T] = apply[T](Props(factory), name)
|
||||||
|
|
||||||
def apply[T <: Actor](props: Props)(implicit app: ActorSystem): TestActorRef[T] = apply[T](props, Props.randomName)
|
def apply[T <: Actor](props: Props)(implicit app: ActorSystem): TestActorRef[T] = apply[T](props, randomName)
|
||||||
|
|
||||||
def apply[T <: Actor](props: Props, name: String)(implicit app: ActorSystem): TestActorRef[T] = apply[T](props, app.guardian, name)
|
def apply[T <: Actor](props: Props, name: String)(implicit app: ActorSystem): TestActorRef[T] = apply[T](props, app.guardian, name)
|
||||||
|
|
||||||
def apply[T <: Actor](props: Props, supervisor: ActorRef, givenName: String)(implicit app: ActorSystem): TestActorRef[T] = {
|
def apply[T <: Actor](props: Props, supervisor: ActorRef, name: String)(implicit app: ActorSystem): TestActorRef[T] = {
|
||||||
val name: String = givenName match {
|
|
||||||
case null | Props.randomName ⇒ newUuid.toString
|
|
||||||
case given ⇒ given
|
|
||||||
}
|
|
||||||
new TestActorRef(app, props, supervisor, name)
|
new TestActorRef(app, props, supervisor, name)
|
||||||
}
|
}
|
||||||
|
|
||||||
def apply[T <: Actor](implicit m: Manifest[T], app: ActorSystem): TestActorRef[T] = apply[T](Props.randomName)
|
def apply[T <: Actor](implicit m: Manifest[T], app: ActorSystem): TestActorRef[T] = apply[T](randomName)
|
||||||
|
|
||||||
def apply[T <: Actor](name: String)(implicit m: Manifest[T], app: ActorSystem): TestActorRef[T] = apply[T](Props({
|
def apply[T <: Actor](name: String)(implicit m: Manifest[T], app: ActorSystem): TestActorRef[T] = apply[T](Props({
|
||||||
import ReflectiveAccess.{ createInstance, noParams, noArgs }
|
import ReflectiveAccess.{ createInstance, noParams, noArgs }
|
||||||
|
|
|
||||||
|
|
@ -81,7 +81,7 @@ class TestFSMRef[S, D, T <: Actor](app: ActorSystem, props: Props, supervisor: A
|
||||||
object TestFSMRef {
|
object TestFSMRef {
|
||||||
|
|
||||||
def apply[S, D, T <: Actor](factory: ⇒ T)(implicit ev: T <:< FSM[S, D], app: ActorSystem): TestFSMRef[S, D, T] =
|
def apply[S, D, T <: Actor](factory: ⇒ T)(implicit ev: T <:< FSM[S, D], app: ActorSystem): TestFSMRef[S, D, T] =
|
||||||
new TestFSMRef(app, Props(creator = () ⇒ factory), app.guardian, Props.randomName)
|
new TestFSMRef(app, Props(creator = () ⇒ factory), app.guardian, TestActorRef.randomName)
|
||||||
|
|
||||||
def apply[S, D, T <: Actor](factory: ⇒ T, name: String)(implicit ev: T <:< FSM[S, D], app: ActorSystem): TestFSMRef[S, D, T] =
|
def apply[S, D, T <: Actor](factory: ⇒ T, name: String)(implicit ev: T <:< FSM[S, D], app: ActorSystem): TestFSMRef[S, D, T] =
|
||||||
new TestFSMRef(app, Props(creator = () ⇒ factory), app.guardian, name)
|
new TestFSMRef(app, Props(creator = () ⇒ factory), app.guardian, name)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue