diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 44672463be..0bc355cf69 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -4,16 +4,21 @@ package akka.actor -import akka.config.ConfigurationException -import akka.util.ReflectiveAccess -import akka.routing._ -import com.eaio.uuid.UUID -import akka.AkkaException -import akka.dispatch._ +import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.{ TimeUnit, Executors } + import scala.annotation.tailrec -import org.jboss.netty.akka.util.HashedWheelTimer -import java.util.concurrent.{ TimeUnit, Executors, ConcurrentHashMap } -import akka.event.{ LoggingAdapter, ActorClassification, DeathWatch, Logging } + +import org.jboss.netty.akka.util.{ TimerTask, HashedWheelTimer } + +import akka.actor.Timeout.intToTimeout +import akka.config.ConfigurationException +import akka.dispatch.{ SystemMessage, Supervise, Promise, MessageDispatcher, Future, DefaultPromise } +import akka.event.{ Logging, DeathWatch, ActorClassification } +import akka.routing.{ ScatterGatherFirstCompletedRouter, Routing, RouterType, Router, RoutedProps, RoutedActorRef, RoundRobinRouter, RandomRouter, LocalConnectionManager, DirectRouter } +import akka.util.Helpers +import akka.AkkaException /** * Interface for all ActorRef providers to implement. @@ -50,6 +55,8 @@ trait ActorRefProvider { private[akka] def terminationFuture: Future[ActorSystem.ExitStatus] private[akka] def dummyAskSender: ActorRef + + private[akka] def tempPath: String } /** @@ -66,14 +73,25 @@ trait ActorRefFactory { */ protected def guardian: ActorRef - def actorOf(props: Props): ActorRef = actorOf(props, Props.randomName) + private val number = new AtomicLong + + private def randomName: String = { + val l = number.getAndIncrement() + Helpers.base64(l) + } + + def actorOf(props: Props): ActorRef = provider.actorOf(props, guardian, randomName, false) /* * TODO this will have to go at some point, because creating two actors with * the same address can race on the cluster, and then you never know which * implementation wins */ - def actorOf(props: Props, name: String): ActorRef = provider.actorOf(props, guardian, name, false) + def actorOf(props: Props, name: String): ActorRef = { + if (name == null || name == "" || name.startsWith("$")) + throw new ActorInitializationException("actor name must not be null, empty or start with $") + provider.actorOf(props, guardian, name, false) + } def actorOf[T <: Actor](implicit m: Manifest[T]): ActorRef = actorOf(Props(m.erasure.asInstanceOf[Class[_ <: Actor]])) @@ -180,65 +198,55 @@ class LocalActorRefProvider(val app: ActorSystem) extends ActorRefProvider { private[akka] def actorOf(props: Props, supervisor: ActorRef, path: ActorPath, systemService: Boolean): ActorRef = { val name = path.name - if ((name eq null) || name == Props.randomName) { - val randomName: String = newUuid.toString - val newPath = path.parent / randomName - val actor = new LocalActorRef(app, props, supervisor, newPath, systemService = true) - actors.putIfAbsent(newPath.toString, actor) match { - case null ⇒ actor - case other ⇒ throw new IllegalStateException("Same path generated twice for: " + actor + " and " + other) - } - } else { - val newFuture = Promise[ActorRef](5000)(app.dispatcher) // FIXME is this proper timeout? + val newFuture = Promise[ActorRef](5000)(app.dispatcher) // FIXME is this proper timeout? - actors.putIfAbsent(path.toString, newFuture) match { - case null ⇒ - val actor: ActorRef = try { - (if (systemService) None else deployer.lookupDeployment(path.toString)) match { // see if the deployment already exists, if so use it, if not create actor + actors.putIfAbsent(path.toString, newFuture) match { + case null ⇒ + val actor: ActorRef = try { + (if (systemService) None else deployer.lookupDeployment(path.toString)) match { // see if the deployment already exists, if so use it, if not create actor - // create a local actor - case None | Some(DeploymentConfig.Deploy(_, _, DeploymentConfig.Direct, _, DeploymentConfig.LocalScope)) ⇒ - new LocalActorRef(app, props, supervisor, path, systemService) // create a local actor + // create a local actor + case None | Some(DeploymentConfig.Deploy(_, _, DeploymentConfig.Direct, _, DeploymentConfig.LocalScope)) ⇒ + new LocalActorRef(app, props, supervisor, path, systemService) // create a local actor - // create a routed actor ref - case deploy @ Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, DeploymentConfig.LocalScope)) ⇒ + // create a routed actor ref + case deploy @ Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, DeploymentConfig.LocalScope)) ⇒ - val routerFactory: () ⇒ Router = DeploymentConfig.routerTypeFor(routerType) match { - case RouterType.Direct ⇒ () ⇒ new DirectRouter - case RouterType.Random ⇒ () ⇒ new RandomRouter - case RouterType.RoundRobin ⇒ () ⇒ new RoundRobinRouter - case RouterType.ScatterGather ⇒ () ⇒ new ScatterGatherFirstCompletedRouter()( - if (props.dispatcher == Props.defaultDispatcher) app.dispatcher else props.dispatcher, app.AkkaConfig.ActorTimeout) - case RouterType.LeastCPU ⇒ sys.error("Router LeastCPU not supported yet") - case RouterType.LeastRAM ⇒ sys.error("Router LeastRAM not supported yet") - case RouterType.LeastMessages ⇒ sys.error("Router LeastMessages not supported yet") - case RouterType.Custom(implClass) ⇒ () ⇒ Routing.createCustomRouter(implClass) - } + val routerFactory: () ⇒ Router = DeploymentConfig.routerTypeFor(routerType) match { + case RouterType.Direct ⇒ () ⇒ new DirectRouter + case RouterType.Random ⇒ () ⇒ new RandomRouter + case RouterType.RoundRobin ⇒ () ⇒ new RoundRobinRouter + case RouterType.ScatterGather ⇒ () ⇒ new ScatterGatherFirstCompletedRouter()( + if (props.dispatcher == Props.defaultDispatcher) app.dispatcher else props.dispatcher, app.AkkaConfig.ActorTimeout) + case RouterType.LeastCPU ⇒ sys.error("Router LeastCPU not supported yet") + case RouterType.LeastRAM ⇒ sys.error("Router LeastRAM not supported yet") + case RouterType.LeastMessages ⇒ sys.error("Router LeastMessages not supported yet") + case RouterType.Custom(implClass) ⇒ () ⇒ Routing.createCustomRouter(implClass) + } - val connections: Iterable[ActorRef] = (1 to nrOfInstances.factor) map { i ⇒ - val routedPath = path.parent / (path.name + ":" + i) - new LocalActorRef(app, props, supervisor, routedPath, systemService) - } + val connections: Iterable[ActorRef] = (1 to nrOfInstances.factor) map { i ⇒ + val routedPath = path.parent / (path.name + ":" + i) + new LocalActorRef(app, props, supervisor, routedPath, systemService) + } - actorOf(RoutedProps(routerFactory = routerFactory, connectionManager = new LocalConnectionManager(connections)), supervisor, path.toString) + actorOf(RoutedProps(routerFactory = routerFactory, connectionManager = new LocalConnectionManager(connections)), supervisor, path.toString) - case unknown ⇒ throw new Exception("Don't know how to create this actor ref! Why? Got: " + unknown) - } - } catch { - case e: Exception ⇒ - newFuture completeWithException e // so the other threads gets notified of error - //TODO FIXME should we remove the mapping in "actors" here? - throw e + case unknown ⇒ throw new Exception("Don't know how to create this actor ref! Why? Got: " + unknown) } + } catch { + case e: Exception ⇒ + newFuture completeWithException e // so the other threads gets notified of error + //TODO FIXME should we remove the mapping in "actors" here? + throw e + } - newFuture completeWithResult actor - actors.replace(path.toString, newFuture, actor) - actor - case actor: ActorRef ⇒ - actor - case future: Future[_] ⇒ - future.get.asInstanceOf[ActorRef] - } + newFuture completeWithResult actor + actors.replace(path.toString, newFuture, actor) + actor + case actor: ActorRef ⇒ + actor + case future: Future[_] ⇒ + future.get.asInstanceOf[ActorRef] } } @@ -280,6 +288,12 @@ class LocalActorRefProvider(val app: ActorSystem) extends ActorRefProvider { } private[akka] val dummyAskSender = new DeadLetterActorRef(app) + + private val tempNumber = new AtomicLong + def tempPath = { + val l = tempNumber.getAndIncrement() + "$_" + Helpers.base64(l) + } } class LocalDeathWatch extends DeathWatch with ActorClassification { @@ -299,7 +313,6 @@ class LocalDeathWatch extends DeathWatch with ActorClassification { } } -import org.jboss.netty.akka.util.{ HashedWheelTimer, TimerTask } class DefaultScheduler(hashedWheelTimer: HashedWheelTimer) extends Scheduler { def schedule(receiver: ActorRef, message: Any, initialDelay: Long, delay: Long, timeUnit: TimeUnit): Cancellable = diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index 8005d9c631..3dc309f207 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -22,8 +22,9 @@ trait ActorDeployer { private[akka] def deploy(deployment: Deploy): Unit private[akka] def lookupDeploymentFor(path: String): Option[Deploy] def lookupDeployment(path: String): Option[Deploy] = path match { - case null | Props.`randomName` ⇒ None - case some ⇒ lookupDeploymentFor(some) + case null | "" ⇒ None + case s if s.startsWith("$") ⇒ None + case some ⇒ lookupDeploymentFor(some) } private[akka] def deploy(deployment: Seq[Deploy]): Unit = deployment foreach (deploy(_)) } diff --git a/akka-actor/src/main/scala/akka/actor/Props.scala b/akka-actor/src/main/scala/akka/actor/Props.scala index a42ff45f6e..c9b28e4fa4 100644 --- a/akka-actor/src/main/scala/akka/actor/Props.scala +++ b/akka-actor/src/main/scala/akka/actor/Props.scala @@ -30,7 +30,6 @@ object Props { final val defaultFaultHandler: FaultHandlingStrategy = OneForOneStrategy(defaultDecider, None, None) final val noHotSwap: Stack[Actor.Receive] = Stack.empty - final val randomName: String = "" /** * The default Props instance, uses the settings from the Props object starting with default* diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 91152e6c33..f443be5a2f 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -129,7 +129,7 @@ trait TypedActorFactory { this: ActorRefFactory ⇒ * all interfaces (Class.getInterfaces) if it's not an interface class */ def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props): R = - typedActor.createProxyAndTypedActor(this, interface, impl.newInstance, props, Props.randomName, interface.getClassLoader) + typedActor.createProxyAndTypedActor(this, interface, impl.newInstance, props, None, interface.getClassLoader) /** * Creates a new TypedActor proxy using the supplied Props, @@ -137,7 +137,7 @@ trait TypedActorFactory { this: ActorRefFactory ⇒ * all interfaces (Class.getInterfaces) if it's not an interface class */ def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props, name: String): R = - typedActor.createProxyAndTypedActor(this, interface, impl.newInstance, props, name, interface.getClassLoader) + typedActor.createProxyAndTypedActor(this, interface, impl.newInstance, props, Some(name), interface.getClassLoader) /** * Creates a new TypedActor proxy using the supplied Props, @@ -145,7 +145,7 @@ trait TypedActorFactory { this: ActorRefFactory ⇒ * all interfaces (Class.getInterfaces) if it's not an interface class */ def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props): R = - typedActor.createProxyAndTypedActor(this, interface, impl.create, props, Props.randomName, interface.getClassLoader) + typedActor.createProxyAndTypedActor(this, interface, impl.create, props, None, interface.getClassLoader) /** * Creates a new TypedActor proxy using the supplied Props, @@ -153,7 +153,7 @@ trait TypedActorFactory { this: ActorRefFactory ⇒ * all interfaces (Class.getInterfaces) if it's not an interface class */ def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props, name: String): R = - typedActor.createProxyAndTypedActor(this, interface, impl.create, props, name, interface.getClassLoader) + typedActor.createProxyAndTypedActor(this, interface, impl.create, props, Some(name), interface.getClassLoader) /** * Creates a new TypedActor proxy using the supplied Props, @@ -161,7 +161,7 @@ trait TypedActorFactory { this: ActorRefFactory ⇒ * all interfaces (Class.getInterfaces) if it's not an interface class */ def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props, loader: ClassLoader): R = - typedActor.createProxyAndTypedActor(this, interface, impl.newInstance, props, Props.randomName, loader) + typedActor.createProxyAndTypedActor(this, interface, impl.newInstance, props, None, loader) /** * Creates a new TypedActor proxy using the supplied Props, @@ -169,7 +169,7 @@ trait TypedActorFactory { this: ActorRefFactory ⇒ * all interfaces (Class.getInterfaces) if it's not an interface class */ def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props, name: String, loader: ClassLoader): R = - typedActor.createProxyAndTypedActor(this, interface, impl.newInstance, props, name, loader) + typedActor.createProxyAndTypedActor(this, interface, impl.newInstance, props, Some(name), loader) /** * Creates a new TypedActor proxy using the supplied Props, @@ -177,7 +177,7 @@ trait TypedActorFactory { this: ActorRefFactory ⇒ * all interfaces (Class.getInterfaces) if it's not an interface class */ def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props, loader: ClassLoader): R = - typedActor.createProxyAndTypedActor(this, interface, impl.create, props, Props.randomName, loader) + typedActor.createProxyAndTypedActor(this, interface, impl.create, props, None, loader) /** * Creates a new TypedActor proxy using the supplied Props, @@ -185,65 +185,65 @@ trait TypedActorFactory { this: ActorRefFactory ⇒ * all interfaces (Class.getInterfaces) if it's not an interface class */ def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props, name: String, loader: ClassLoader): R = - typedActor.createProxyAndTypedActor(this, interface, impl.create, props, name, loader) + typedActor.createProxyAndTypedActor(this, interface, impl.create, props, Some(name), loader) /** * Creates a new TypedActor proxy using the supplied Props, * the interfaces usable by the returned proxy is the supplied implementation class' interfaces (Class.getInterfaces) */ def typedActorOf[R <: AnyRef, T <: R](impl: Class[T], props: Props, loader: ClassLoader): R = - typedActor.createProxyAndTypedActor(this, impl, impl.newInstance, props, Props.randomName, loader) + typedActor.createProxyAndTypedActor(this, impl, impl.newInstance, props, None, loader) /** * Creates a new TypedActor proxy using the supplied Props, * the interfaces usable by the returned proxy is the supplied implementation class' interfaces (Class.getInterfaces) */ def typedActorOf[R <: AnyRef, T <: R](impl: Class[T], props: Props, name: String, loader: ClassLoader): R = - typedActor.createProxyAndTypedActor(this, impl, impl.newInstance, props, name, loader) + typedActor.createProxyAndTypedActor(this, impl, impl.newInstance, props, Some(name), loader) /** * Creates a new TypedActor proxy using the supplied Props, * the interfaces usable by the returned proxy is the supplied implementation class' interfaces (Class.getInterfaces) */ - def typedActorOf[R <: AnyRef, T <: R](props: Props = Props(), name: String = Props.randomName, loader: ClassLoader = null)(implicit m: Manifest[T]): R = { + def typedActorOf[R <: AnyRef, T <: R](props: Props = Props(), name: String = null, loader: ClassLoader = null)(implicit m: Manifest[T]): R = { val clazz = m.erasure.asInstanceOf[Class[T]] - typedActor.createProxyAndTypedActor(this, clazz, clazz.newInstance, props, name, if (loader eq null) clazz.getClassLoader else loader) + typedActor.createProxyAndTypedActor(this, clazz, clazz.newInstance, props, Option(name), if (loader eq null) clazz.getClassLoader else loader) } /** * Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself, * to create TypedActor proxies, use typedActorOf */ - def createProxy[R <: AnyRef](constructor: ⇒ Actor, props: Props = Props(), name: String = Props.randomName, loader: ClassLoader = null)(implicit m: Manifest[R]): R = - typedActor.createProxy[R](this, typedActor.extractInterfaces(m.erasure), (ref: AtomVar[R]) ⇒ constructor, props, Props.randomName, if (loader eq null) m.erasure.getClassLoader else loader) + def createProxy[R <: AnyRef](constructor: ⇒ Actor, props: Props = Props(), name: String = null, loader: ClassLoader = null)(implicit m: Manifest[R]): R = + typedActor.createProxy[R](this, typedActor.extractInterfaces(m.erasure), (ref: AtomVar[R]) ⇒ constructor, props, Option(name), if (loader eq null) m.erasure.getClassLoader else loader) /** * Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself, * to create TypedActor proxies, use typedActorOf */ def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Creator[Actor], props: Props, loader: ClassLoader): R = - typedActor.createProxy(this, interfaces, (ref: AtomVar[R]) ⇒ constructor.create, props, Props.randomName, loader) + typedActor.createProxy(this, interfaces, (ref: AtomVar[R]) ⇒ constructor.create, props, None, loader) /** * Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself, * to create TypedActor proxies, use typedActorOf */ def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Creator[Actor], props: Props, name: String, loader: ClassLoader): R = - typedActor.createProxy(this, interfaces, (ref: AtomVar[R]) ⇒ constructor.create, props, name, loader) + typedActor.createProxy(this, interfaces, (ref: AtomVar[R]) ⇒ constructor.create, props, Some(name), loader) /** * Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself, * to create TypedActor proxies, use typedActorOf */ def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: ⇒ Actor, props: Props, loader: ClassLoader): R = - typedActor.createProxy[R](this, interfaces, (ref: AtomVar[R]) ⇒ constructor, props, Props.randomName, loader) + typedActor.createProxy[R](this, interfaces, (ref: AtomVar[R]) ⇒ constructor, props, None, loader) /** * Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself, * to create TypedActor proxies, use typedActorOf */ def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: ⇒ Actor, props: Props, name: String, loader: ClassLoader): R = - typedActor.createProxy[R](this, interfaces, (ref: AtomVar[R]) ⇒ constructor, props, name, loader) + typedActor.createProxy[R](this, interfaces, (ref: AtomVar[R]) ⇒ constructor, props, Some(name), loader) } @@ -301,15 +301,15 @@ class TypedActor(val app: ActorSystem) { } else null - private[akka] def createProxy[R <: AnyRef](supervisor: ActorRefFactory, interfaces: Array[Class[_]], constructor: (AtomVar[R]) ⇒ Actor, props: Props, name: String, loader: ClassLoader): R = { + private[akka] def createProxy[R <: AnyRef](supervisor: ActorRefFactory, interfaces: Array[Class[_]], constructor: (AtomVar[R]) ⇒ Actor, props: Props, name: Option[String], loader: ClassLoader): R = { val proxyVar = new AtomVar[R] configureAndProxyLocalActorRef[R](supervisor, interfaces, proxyVar, props.withCreator(constructor(proxyVar)), name, loader) } - private[akka] def createProxyAndTypedActor[R <: AnyRef, T <: R](supervisor: ActorRefFactory, interface: Class[_], constructor: ⇒ T, props: Props, name: String, loader: ClassLoader): R = + private[akka] def createProxyAndTypedActor[R <: AnyRef, T <: R](supervisor: ActorRefFactory, interface: Class[_], constructor: ⇒ T, props: Props, name: Option[String], loader: ClassLoader): R = createProxy[R](supervisor, extractInterfaces(interface), (ref: AtomVar[R]) ⇒ new TypedActor[R, T](ref, constructor), props, name, loader) - private[akka] def configureAndProxyLocalActorRef[T <: AnyRef](supervisor: ActorRefFactory, interfaces: Array[Class[_]], proxyVar: AtomVar[T], props: Props, name: String, loader: ClassLoader): T = { + private[akka] def configureAndProxyLocalActorRef[T <: AnyRef](supervisor: ActorRefFactory, interfaces: Array[Class[_]], proxyVar: AtomVar[T], props: Props, name: Option[String], loader: ClassLoader): T = { //Warning, do not change order of the following statements, it's some elaborate chicken-n-egg handling val actorVar = new AtomVar[ActorRef](null) val timeout = props.timeout match { @@ -318,7 +318,7 @@ class TypedActor(val app: ActorSystem) { } val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(actorVar, timeout)).asInstanceOf[T] proxyVar.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive - val ref = supervisor.actorOf(props, name) + val ref = if (name.isDefined) supervisor.actorOf(props, name.get) else supervisor.actorOf(props) actorVar.set(ref) //Make sure the InvocationHandler gets ahold of the actor reference, this is not a problem since the proxy hasn't escaped this method yet proxyVar.get } diff --git a/akka-actor/src/main/scala/akka/util/Helpers.scala b/akka-actor/src/main/scala/akka/util/Helpers.scala index 14af35523c..67a77aa150 100644 --- a/akka-actor/src/main/scala/akka/util/Helpers.scala +++ b/akka-actor/src/main/scala/akka/util/Helpers.scala @@ -5,6 +5,7 @@ package akka.util import java.io.{ PrintWriter, StringWriter } import java.util.Comparator +import scala.annotation.tailrec /** * @author Jonas Bonér @@ -40,6 +41,16 @@ object Helpers { (0 until 4).foldLeft(0)((value, index) ⇒ value + ((bytes(index + offset) & 0x000000FF) << ((4 - 1 - index) * 8))) } + final val base64chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789*?" + + @tailrec + def base64(l: Long, sb: StringBuilder = new StringBuilder("$")): String = { + sb += base64chars.charAt(l.toInt & 63) + val next = l >>> 6 + if (next == 0) sb.toString + else base64(next, sb) + } + def ignore[E: Manifest](body: ⇒ Unit) { try { body diff --git a/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala b/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala index e81d764140..2763080121 100644 --- a/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala +++ b/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala @@ -65,7 +65,7 @@ class NetworkEventStream(val app: ActorSystem) { // FIXME: check that this supervision is correct private[akka] val sender = app.provider.actorOf( Props[Channel].copy(dispatcher = app.dispatcherFactory.newPinnedDispatcher("NetworkEventStream")), - app.guardian, Props.randomName, systemService = true) + app.systemGuardian, "network-event-sender", systemService = true) /** * Registers a network event stream listener (asyncronously). diff --git a/akka-remote/src/main/scala/akka/remote/Remote.scala b/akka-remote/src/main/scala/akka/remote/Remote.scala index a56e6be38a..ef29a8de00 100644 --- a/akka-remote/src/main/scala/akka/remote/Remote.scala +++ b/akka-remote/src/main/scala/akka/remote/Remote.scala @@ -188,7 +188,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor { Props( context ⇒ { case f: Function0[_] ⇒ try { f() } finally { context.self.stop() } - }).copy(dispatcher = computeGridDispatcher), app.guardian, app.guardian.path / Props.randomName, systemService = true) ! payloadFor(message, classOf[Function0[Unit]]) + }).copy(dispatcher = computeGridDispatcher), app.guardian, app.guardian.path / app.provider.tempPath, systemService = true) ! payloadFor(message, classOf[Function0[Unit]]) } // FIXME: handle real remote supervision @@ -197,7 +197,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor { Props( context ⇒ { case f: Function0[_] ⇒ try { sender ! f() } finally { context.self.stop() } - }).copy(dispatcher = computeGridDispatcher), app.guardian, app.guardian.path / Props.randomName, systemService = true) forward payloadFor(message, classOf[Function0[Any]]) + }).copy(dispatcher = computeGridDispatcher), app.guardian, app.guardian.path / app.provider.tempPath, systemService = true) forward payloadFor(message, classOf[Function0[Any]]) } // FIXME: handle real remote supervision @@ -206,7 +206,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor { Props( context ⇒ { case (fun: Function[_, _], param: Any) ⇒ try { fun.asInstanceOf[Any ⇒ Unit].apply(param) } finally { context.self.stop() } - }).copy(dispatcher = computeGridDispatcher), app.guardian, app.guardian.path / Props.randomName, systemService = true) ! payloadFor(message, classOf[Tuple2[Function1[Any, Unit], Any]]) + }).copy(dispatcher = computeGridDispatcher), app.guardian, app.guardian.path / app.provider.tempPath, systemService = true) ! payloadFor(message, classOf[Tuple2[Function1[Any, Unit], Any]]) } // FIXME: handle real remote supervision @@ -215,7 +215,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor { Props( context ⇒ { case (fun: Function[_, _], param: Any) ⇒ try { sender ! fun.asInstanceOf[Any ⇒ Any](param) } finally { context.self.stop() } - }).copy(dispatcher = computeGridDispatcher), app.guardian, app.guardian.path / Props.randomName, systemService = true) forward payloadFor(message, classOf[Tuple2[Function1[Any, Any], Any]]) + }).copy(dispatcher = computeGridDispatcher), app.guardian, app.guardian.path / app.provider.tempPath, systemService = true) forward payloadFor(message, classOf[Tuple2[Function1[Any, Any], Any]]) } def handleFailover(message: RemoteSystemDaemonMessageProtocol) { diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index e501373fef..1eeeb2ccf5 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -243,6 +243,8 @@ class RemoteActorRefProvider(val app: ActorSystem) extends ActorRefProvider { private[akka] def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = local.ask(message, recipient, within) private[akka] def dummyAskSender = local.dummyAskSender + + private[akka] def tempPath = local.tempPath } /** diff --git a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala index 5226f0721d..8843330c03 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala @@ -9,6 +9,7 @@ import akka.util.ReflectiveAccess import com.eaio.uuid.UUID import akka.actor.Props._ import akka.actor.ActorSystem +import java.util.concurrent.atomic.AtomicLong /** * This special ActorRef is exclusively for use during unit testing in a single-threaded environment. Therefore, it @@ -41,23 +42,25 @@ class TestActorRef[T <: Actor](_app: ActorSystem, _props: Props, _supervisor: Ac object TestActorRef { - def apply[T <: Actor](factory: ⇒ T)(implicit app: ActorSystem): TestActorRef[T] = apply[T](Props(factory), Props.randomName) + private val number = new AtomicLong + private[testkit] def randomName: String = { + val l = number.getAndIncrement() + "$" + akka.util.Helpers.base64(l) + } + + def apply[T <: Actor](factory: ⇒ T)(implicit app: ActorSystem): TestActorRef[T] = apply[T](Props(factory), randomName) def apply[T <: Actor](factory: ⇒ T, name: String)(implicit app: ActorSystem): TestActorRef[T] = apply[T](Props(factory), name) - def apply[T <: Actor](props: Props)(implicit app: ActorSystem): TestActorRef[T] = apply[T](props, Props.randomName) + def apply[T <: Actor](props: Props)(implicit app: ActorSystem): TestActorRef[T] = apply[T](props, randomName) def apply[T <: Actor](props: Props, name: String)(implicit app: ActorSystem): TestActorRef[T] = apply[T](props, app.guardian, name) - def apply[T <: Actor](props: Props, supervisor: ActorRef, givenName: String)(implicit app: ActorSystem): TestActorRef[T] = { - val name: String = givenName match { - case null | Props.randomName ⇒ newUuid.toString - case given ⇒ given - } + def apply[T <: Actor](props: Props, supervisor: ActorRef, name: String)(implicit app: ActorSystem): TestActorRef[T] = { new TestActorRef(app, props, supervisor, name) } - def apply[T <: Actor](implicit m: Manifest[T], app: ActorSystem): TestActorRef[T] = apply[T](Props.randomName) + def apply[T <: Actor](implicit m: Manifest[T], app: ActorSystem): TestActorRef[T] = apply[T](randomName) def apply[T <: Actor](name: String)(implicit m: Manifest[T], app: ActorSystem): TestActorRef[T] = apply[T](Props({ import ReflectiveAccess.{ createInstance, noParams, noArgs } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestFSMRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestFSMRef.scala index 2ff772dcec..87b6aa6765 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestFSMRef.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestFSMRef.scala @@ -81,7 +81,7 @@ class TestFSMRef[S, D, T <: Actor](app: ActorSystem, props: Props, supervisor: A object TestFSMRef { def apply[S, D, T <: Actor](factory: ⇒ T)(implicit ev: T <:< FSM[S, D], app: ActorSystem): TestFSMRef[S, D, T] = - new TestFSMRef(app, Props(creator = () ⇒ factory), app.guardian, Props.randomName) + new TestFSMRef(app, Props(creator = () ⇒ factory), app.guardian, TestActorRef.randomName) def apply[S, D, T <: Actor](factory: ⇒ T, name: String)(implicit ev: T <:< FSM[S, D], app: ActorSystem): TestFSMRef[S, D, T] = new TestFSMRef(app, Props(creator = () ⇒ factory), app.guardian, name)