second step: remove LocalActorRefProvider.actors

- duplicate name detection done within ActorCell/ActorSystem (i.e. at
  parent level)
- no caching needed for local look-ups, might re-introduce cache in
  remote layer
- implement suitable equals/hashCode on ActorPaths
- fix some (unintended => buggy) name reuses which previously silently
  returned a different actor
- serialization & EventStreamSpec still failing, need to commit to merge
  in other stuff on which the future fixes will depend
This commit is contained in:
Roland 2011-11-29 16:32:50 +01:00
parent dad1c98c48
commit 3182fa3d73
17 changed files with 329 additions and 250 deletions

View file

@ -120,7 +120,7 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) {
RoundRobin,
NrOfInstances(3),
RemoteScope(Seq(
RemoteAddress("wallace", 2552), RemoteAddress("gromit", 2552))))))
RemoteAddress(system.name, "wallace", 2552), RemoteAddress(system.name, "gromit", 2552))))))
}
"be able to parse 'akka.actor.deployment._' with recipe" in {

View file

@ -12,16 +12,28 @@ import akka.dispatch.Future
class LocalActorRefProviderSpec extends AkkaSpec {
"An LocalActorRefProvider" must {
"find actor refs using actorFor" in {
val a = actorOf(Props(ctx { case _ }))
val b = system.actorFor(a.path)
a must be === b
}
"only create one instance of an actor with a specific address in a concurrent environment" in {
val impl = system.asInstanceOf[ActorSystemImpl]
val provider = impl.provider
provider.isInstanceOf[LocalActorRefProvider] must be(true)
(0 until 100) foreach { i // 100 concurrent runs
for (i 0 until 100) {
val address = "new-actor" + i
implicit val timeout = Timeout(5 seconds)
((1 to 4) map { _ Future { provider.actorOf(impl, Props(c { case _ }), impl.guardian, address) } }).map(_.get).distinct.size must be(1)
val actors = for (j 1 to 4) yield Future(system.actorOf(Props(c { case _ }), address))
val set = Set() ++ actors.map(_.await.value match {
case Some(Right(a: ActorRef)) 1
case Some(Left(ex: InvalidActorNameException)) 2
case x x
})
set must be === Set(1, 2)
}
}
}

View file

@ -77,6 +77,8 @@ class ActorKilledException private[akka] (message: String, cause: Throwable)
def this(msg: String) = this(msg, null);
}
case class InvalidActorNameException(message: String) extends AkkaException(message)
case class ActorInitializationException private[akka] (actor: ActorRef, message: String, cause: Throwable = null)
extends AkkaException(message, cause) with NoStackTrace {
def this(msg: String) = this(null, msg, null);

View file

@ -97,6 +97,10 @@ private[akka] class ActorCell(
var childrenRefs: TreeMap[String, ChildRestartStats] = emptyChildrenRefs
protected def isDuplicate(name: String): Boolean = {
childrenRefs contains name
}
var currentMessage: Envelope = null
var actor: Actor = _
@ -152,8 +156,13 @@ private[akka] class ActorCell(
final def children: Iterable[ActorRef] = childrenRefs.values.view.map(_.child)
final def getChild(name: String): Option[ActorRef] =
if (isTerminated) None else childrenRefs.get(name).map(_.child)
final def getChild(name: String): ActorRef =
if (isTerminated) null
else {
val c = childrenRefs
if (c contains name) c(name).child
else null
}
final def tell(message: Any, sender: ActorRef): Unit =
dispatcher.dispatch(this, Envelope(message, if (sender eq null) system.deadLetters else sender))
@ -360,9 +369,6 @@ private[akka] class ActorCell(
}
private def doTerminate() {
if (!system.provider.evict(self.path.toString))
system.eventStream.publish(Warning(self.toString, "evict of " + self.path.toString + " failed"))
dispatcher.detach(this)
try {

View file

@ -5,63 +5,20 @@ package akka.actor
import scala.annotation.tailrec
object ActorPath {
// this cannot really be changed due to usage of standard URI syntax
final val separator = "/"
val pattern = """(/[0-9a-zA-Z\-\_\$\.]+)+""".r.pattern
/**
* Create an actor path from a string.
*/
def apply(system: ActorSystem, path: String): ActorPath =
apply(system, split(path))
/**
* Create an actor path from an iterable.
*/
def apply(system: ActorSystem, path: Iterable[String]): ActorPath =
path.foldLeft(system.asInstanceOf[ActorSystemImpl].provider.rootPath)(_ / _)
/**
* Split a string path into an iterable.
*/
def split(path: String): Iterable[String] =
if (path.startsWith(separator))
path.substring(1).split(separator)
else
path.split(separator)
/**
* Join an iterable path into a string.
*/
def join(path: Iterable[String]): String =
path.mkString(separator, separator, "")
/**
* Is this string representation of a path valid?
*/
def valid(path: String): Boolean =
pattern.matcher(path).matches
/**
* Validate a path. Moved here from Address.validate.
* Throws an IllegalArgumentException if the path is invalid.
*/
def validate(path: String): Unit = {
if (!valid(path))
throw new IllegalArgumentException("Path [" + path + "] is not valid. Needs to follow this pattern: " + pattern)
}
}
/**
* Actor path is a unique path to an actor that shows the creation path
* up through the actor tree to the root actor.
*/
trait ActorPath {
sealed trait ActorPath {
/**
* The Address under which this path can be reached; walks up the tree to
* the RootActorPath.
*/
def address: Address = root.address
def address: Address
/**
* The name of the actor that this path refers to.
@ -84,7 +41,7 @@ trait ActorPath {
def /(child: Iterable[String]): ActorPath = (this /: child)(_ / _)
/**
* Sequence of names for this path.
* Sequence of names for this path. Performance implication: has to allocate a list.
*/
def pathElements: Iterable[String]
@ -92,13 +49,14 @@ trait ActorPath {
* Walk up the tree to obtain and return the RootActorPath.
*/
def root: RootActorPath
}
/**
* Root of the hierarchy of ActorPaths. There is exactly root per ActorSystem
* and node (for remote-enabled or clustered systems).
*/
class RootActorPath(override val address: Address, val name: String = ActorPath.separator) extends ActorPath {
final case class RootActorPath(address: Address, name: String = ActorPath.separator) extends ActorPath {
def parent: ActorPath = this
@ -108,10 +66,12 @@ class RootActorPath(override val address: Address, val name: String = ActorPath.
def pathElements: Iterable[String] = Iterable.empty
override val toString = address + ActorPath.separator
override val toString = address + name
}
class ChildActorPath(val parent: ActorPath, val name: String) extends ActorPath {
final class ChildActorPath(val parent: ActorPath, val name: String) extends ActorPath {
def address: Address = root.address
def /(child: String): ActorPath = new ChildActorPath(this, child)
@ -133,6 +93,12 @@ class ChildActorPath(val parent: ActorPath, val name: String) extends ActorPath
rec(this)
}
// TODO research whether this should be cached somehow (might be fast enough, but creates GC pressure)
/*
* idea: add one field which holds the total length (because that is known)
* so that only one String needs to be allocated before traversal; this is
* cheaper than any cache
*/
override def toString = {
@tailrec
def rec(p: ActorPath, s: String): String = p match {
@ -142,5 +108,30 @@ class ChildActorPath(val parent: ActorPath, val name: String) extends ActorPath
}
rec(this, "")
}
override def equals(other: Any): Boolean = {
@tailrec
def rec(left: ActorPath, right: ActorPath): Boolean =
if (left eq right) true
else if (left.isInstanceOf[RootActorPath] || right.isInstanceOf[RootActorPath]) left == right
else left.name == right.name && rec(left.parent, right.parent)
other match {
case p: ActorPath rec(this, p)
case _ false
}
}
override def hashCode: Int = {
import scala.util.MurmurHash._
@tailrec
def rec(p: ActorPath, h: Int, c: Int, k: Int): Int = p match {
case r: RootActorPath extendHash(h, r.##, c, k)
case _ rec(p.parent, extendHash(h, stringHash(name), c, k), nextMagicA(c), nextMagicB(k))
}
finalizeHash(rec(this, startHash(42), startMagicA, startMagicB))
}
}

View file

@ -309,7 +309,7 @@ case class SerializedActorRef(hostname: String, port: Int, path: String) {
// FIXME this is broken, but see above
def this(address: Address, path: String) = this(address.hostPort, 0, path)
def this(remoteAddress: RemoteAddress, path: String) = this(remoteAddress.hostname, remoteAddress.port, path)
def this(remoteAddress: RemoteAddress, path: String) = this(remoteAddress.host, remoteAddress.port, path)
def this(remoteAddress: InetSocketAddress, path: String) = this(remoteAddress.getAddress.getHostAddress, remoteAddress.getPort, path) //TODO FIXME REMOVE
@throws(classOf[java.io.ObjectStreamException])
@ -351,6 +351,15 @@ trait MinimalActorRef extends ActorRef with ScalaActorRef {
protected[akka] def restart(cause: Throwable): Unit = ()
}
object MinimalActorRef {
def apply(_path: ActorPath)(receive: PartialFunction[Any, Unit]): ActorRef = new MinimalActorRef {
def path = _path
def address = path.toString
override def !(message: Any)(implicit sender: ActorRef = null): Unit =
if (receive.isDefinedAt(message)) receive(message)
}
}
case class DeadLetter(message: Any, sender: ActorRef, recipient: ActorRef)
object DeadLetterActorRef {
@ -399,7 +408,7 @@ class DeadLetterActorRef(val eventStream: EventStream) extends MinimalActorRef {
private def writeReplace(): AnyRef = DeadLetterActorRef.serialized
}
abstract class AskActorRef(val path: ActorPath, provider: ActorRefProvider, deathWatch: DeathWatch, timeout: Timeout, val dispatcher: MessageDispatcher) extends MinimalActorRef {
class AskActorRef(val path: ActorPath, provider: ActorRefProvider, deathWatch: DeathWatch, timeout: Timeout, val dispatcher: MessageDispatcher) extends MinimalActorRef {
final val result = new DefaultPromise[Any](timeout)(dispatcher)
override def name = path.name
@ -412,7 +421,7 @@ abstract class AskActorRef(val path: ActorPath, provider: ActorRefProvider, deat
result onTimeout callback
}
protected def whenDone(): Unit
protected def whenDone(): Unit = {}
override def !(message: Any)(implicit sender: ActorRef = null): Unit = message match {
case Status.Success(r) result.completeWithResult(r)

View file

@ -17,21 +17,26 @@ import akka.AkkaException
import com.eaio.uuid.UUID
import akka.util.{ Duration, Switch, Helpers }
import akka.remote.RemoteAddress
import akka.remote.LocalOnly
import org.jboss.netty.akka.util.internal.ConcurrentIdentityHashMap
/**
* Interface for all ActorRef providers to implement.
*/
trait ActorRefProvider {
def actorOf(system: ActorSystemImpl, props: Props, supervisor: ActorRef, name: String): ActorRef = actorOf(system, props, supervisor, name, false)
def actorFor(path: Iterable[String]): Option[ActorRef]
/**
* Reference to the supervisor used for all top-level user actors.
*/
def guardian: ActorRef
/**
* Reference to the supervisor used for all top-level system actors.
*/
def systemGuardian: ActorRef
/**
* Reference to the death watch service.
*/
def deathWatch: DeathWatch
// FIXME: remove/replace?
@ -47,6 +52,12 @@ trait ActorRefProvider {
def settings: ActorSystem.Settings
/**
* Initialization of an ActorRefProvider happens in two steps: first
* construction of the object with settings, eventStream, scheduler, etc.
* and thenwhen the ActorSystem is constructedthe second phase during
* which actors may be created (e.g. the guardians).
*/
def init(system: ActorSystemImpl)
private[akka] def deployer: Deployer
@ -54,21 +65,33 @@ trait ActorRefProvider {
private[akka] def scheduler: Scheduler
/**
* Create an Actor with the given name below the given supervisor.
* Actor factory with create-only semantics: will create an actor as
* described by props with the given supervisor and path (may be different
* in case of remote supervision). If systemService is true, deployment is
* bypassed (local-only).
*/
private[akka] def actorOf(system: ActorSystemImpl, props: Props, supervisor: ActorRef, name: String, systemService: Boolean): ActorRef
def actorOf(system: ActorSystemImpl, props: Props, supervisor: ActorRef, name: String, systemService: Boolean = false): ActorRef
/**
* Create an Actor with the given full path below the given supervisor.
*
* FIXME: Remove! this is dangerous!
* Create actor reference for a specified local or remote path. If no such
* actor exists, it will be (equivalent to) a dead letter reference.
*/
private[akka] def actorOf(system: ActorSystemImpl, props: Props, supervisor: ActorRef, path: ActorPath, systemService: Boolean): ActorRef
def actorFor(path: ActorPath): ActorRef
/**
* Remove this path from the lookup map.
* Create actor reference for a specified local or remote path, which will
* be parsed using java.net.URI. If no such actor exists, it will be
* (equivalent to) a dead letter reference.
*/
private[akka] def evict(path: String): Boolean
def actorFor(s: String): ActorRef
/**
* Create actor reference for the specified child path starting at the root
* guardian. This method always returns an actor which is logically local,
* i.e. it cannot be used to obtain a reference to an actor which is not
* physically or logically attached to this actor system.
*/
def actorFor(p: Iterable[String]): ActorRef
private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef]
@ -106,16 +129,20 @@ trait ActorRefFactory {
protected def randomName(): String
/**
* Child names must be unique within the context of one parent; implement
* this method to have the default implementation of actorOf perform the
* check (and throw an exception if necessary).
*/
protected def isDuplicate(name: String): Boolean
def actorOf(props: Props): ActorRef = provider.actorOf(systemImpl, props, guardian, randomName(), false)
/*
* TODO this will have to go at some point, because creating two actors with
* the same address can race on the cluster, and then you never know which
* implementation wins
*/
def actorOf(props: Props, name: String): ActorRef = {
if (name == null || name == "" || name.startsWith("$"))
throw new ActorInitializationException("actor name must not be null, empty or start with $")
throw new InvalidActorNameException("actor name must not be null, empty or start with $")
if (isDuplicate(name))
throw new InvalidActorNameException("actor name " + name + " is not unique!")
provider.actorOf(systemImpl, props, guardian, name, false)
}
@ -130,11 +157,13 @@ trait ActorRefFactory {
def actorOf(creator: UntypedActorFactory): ActorRef = actorOf(Props(() creator.create()))
def actorFor(path: ActorPath): Option[ActorRef] = actorFor(path.pathElements)
def actorOf(creator: UntypedActorFactory, name: String): ActorRef = actorOf(Props(() creator.create()), name)
def actorFor(path: String): Option[ActorRef] = actorFor(ActorPath.split(path))
def actorFor(path: ActorPath) = provider.actorFor(path)
def actorFor(path: Iterable[String]): Option[ActorRef] = provider.actorFor(path)
def actorFor(path: String): ActorRef = provider.actorFor(path)
def actorFor(path: Iterable[String]): ActorRef = provider.actorFor(path)
}
class ActorRefProviderException(message: String) extends AkkaException(message)
@ -143,16 +172,17 @@ class ActorRefProviderException(message: String) extends AkkaException(message)
* Local ActorRef provider.
*/
class LocalActorRefProvider(
_systemName: String,
val settings: ActorSystem.Settings,
val eventStream: EventStream,
val scheduler: Scheduler,
val rootPath: ActorPath,
val nodename: String,
val clustername: String) extends ActorRefProvider {
val deadLetters: ActorRef) extends ActorRefProvider {
def this(settings: ActorSystem.Settings, eventStream: EventStream, scheduler: Scheduler) {
this(settings, eventStream, scheduler, new RootActorPath(LocalOnly), "local", "local")
}
val rootPath: ActorPath = new RootActorPath(LocalAddress(_systemName))
// FIXME remove both
val nodename: String = "local"
val clustername: String = "local"
val log = Logging(eventStream, "LocalActorRefProvider")
@ -162,14 +192,10 @@ class LocalActorRefProvider(
* generate name for temporary actor refs
*/
private val tempNumber = new AtomicLong
def tempName = "$_" + Helpers.base64(tempNumber.getAndIncrement())
private def tempName = "$_" + Helpers.base64(tempNumber.getAndIncrement())
private val tempNode = rootPath / "tmp"
def tempPath = tempNode / tempName
// FIXME (actor path): this could become a cache for the new tree traversal actorFor
// currently still used for tmp actors (e.g. ask actor refs)
private val actors = new ConcurrentHashMap[String, AnyRef]
/**
* Top-level anchor for the supervision hierarchy of this actor system. Will
* receive only Supervise/ChildTerminated system messages or Failure message.
@ -240,7 +266,7 @@ class LocalActorRefProvider(
private var system: ActorSystemImpl = _
def dispatcher: MessageDispatcher = system.dispatcher
lazy val terminationFuture: DefaultPromise[Unit] = new DefaultPromise[Unit](Timeout.never)(dispatcher)
lazy val rootGuardian: ActorRef = actorOf(system, guardianProps, theOneWhoWalksTheBubblesOfSpaceTime, rootPath, true)
lazy val rootGuardian: ActorRef = new LocalActorRef(system, guardianProps, theOneWhoWalksTheBubblesOfSpaceTime, rootPath, true)
lazy val guardian: ActorRef = actorOf(system, guardianProps, rootGuardian, "app", true)
lazy val systemGuardian: ActorRef = actorOf(system, guardianProps.withCreator(new SystemGuardian), rootGuardian, "sys", true)
@ -253,88 +279,58 @@ class LocalActorRefProvider(
deathWatch.subscribe(rootGuardian, systemGuardian)
}
// FIXME (actor path): should start at the new root guardian, and not use the tail (just to avoid the expected "system" name for now)
def actorFor(path: Iterable[String]): Option[ActorRef] = findInCache(ActorPath.join(path)) orElse findInTree(Some(guardian), path.tail)
def actorFor(path: String): ActorRef = path match {
case LocalActorPath(address, elems) if address == rootPath.address
findInTree(rootGuardian.asInstanceOf[LocalActorRef], elems)
case _ deadLetters
}
def actorFor(path: ActorPath): ActorRef = findInTree(rootGuardian.asInstanceOf[LocalActorRef], path.pathElements)
def actorFor(path: Iterable[String]): ActorRef = findInTree(rootGuardian.asInstanceOf[LocalActorRef], path)
@tailrec
private def findInTree(start: Option[ActorRef], path: Iterable[String]): Option[ActorRef] = {
private def findInTree(start: LocalActorRef, path: Iterable[String]): ActorRef = {
if (path.isEmpty) start
else {
val child = start match {
case Some(local: LocalActorRef) local.underlying.getChild(path.head)
case _ None
}
findInTree(child, path.tail)
else start.underlying.getChild(path.head) match {
case null deadLetters
case child: LocalActorRef findInTree(child, path.tail)
case _ deadLetters
}
}
private def findInCache(path: String): Option[ActorRef] = actors.get(path) match {
case null None
case actor: ActorRef Some(actor)
case future: Future[_] Some(future.get.asInstanceOf[ActorRef])
}
def actorOf(system: ActorSystemImpl, props: Props, supervisor: ActorRef, name: String, systemService: Boolean): ActorRef = {
val path = supervisor.path / name
(if (systemService) None else deployer.lookupDeployment(path.toString)) match {
/**
* Returns true if the actor was in the provider's cache and evicted successfully, else false.
*/
private[akka] def evict(path: String): Boolean = actors.remove(path) ne null
// create a local actor
case None | Some(DeploymentConfig.Deploy(_, _, DeploymentConfig.Direct, _, DeploymentConfig.LocalScope))
new LocalActorRef(system, props, supervisor, path, systemService) // create a local actor
private[akka] def actorOf(system: ActorSystemImpl, props: Props, supervisor: ActorRef, name: String, systemService: Boolean): ActorRef =
actorOf(system, props, supervisor, supervisor.path / name, systemService)
// create a routed actor ref
case deploy @ Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, DeploymentConfig.LocalScope))
private[akka] def actorOf(system: ActorSystemImpl, props: Props, supervisor: ActorRef, path: ActorPath, systemService: Boolean): ActorRef = {
val name = path.name
val newFuture = Promise[ActorRef](5000)(dispatcher) // FIXME is this proper timeout?
actors.putIfAbsent(path.toString, newFuture) match {
case null
val actor: ActorRef = try {
(if (systemService) None else deployer.lookupDeployment(path.toString)) match { // see if the deployment already exists, if so use it, if not create actor
// create a local actor
case None | Some(DeploymentConfig.Deploy(_, _, DeploymentConfig.Direct, _, DeploymentConfig.LocalScope))
new LocalActorRef(system, props, supervisor, path, systemService) // create a local actor
// create a routed actor ref
case deploy @ Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, DeploymentConfig.LocalScope))
val routerFactory: () Router = DeploymentConfig.routerTypeFor(routerType) match {
case RouterType.Direct () new DirectRouter
case RouterType.Random () new RandomRouter
case RouterType.RoundRobin () new RoundRobinRouter
case RouterType.ScatterGather () new ScatterGatherFirstCompletedRouter()(
if (props.dispatcher == Props.defaultDispatcher) dispatcher else props.dispatcher, settings.ActorTimeout)
case RouterType.LeastCPU sys.error("Router LeastCPU not supported yet")
case RouterType.LeastRAM sys.error("Router LeastRAM not supported yet")
case RouterType.LeastMessages sys.error("Router LeastMessages not supported yet")
case RouterType.Custom(implClass) () Routing.createCustomRouter(implClass)
}
val connections: Iterable[ActorRef] = (1 to nrOfInstances.factor) map { i
val routedPath = path.parent / (path.name + ":" + i)
new LocalActorRef(system, props, supervisor, routedPath, systemService)
}
actorOf(system, RoutedProps(routerFactory = routerFactory, connectionManager = new LocalConnectionManager(connections)), supervisor, path.toString)
case unknown throw new Exception("Don't know how to create this actor ref! Why? Got: " + unknown)
}
} catch {
case e: Exception
newFuture completeWithException e // so the other threads gets notified of error
//TODO FIXME should we remove the mapping in "actors" here?
throw e
val routerFactory: () Router = DeploymentConfig.routerTypeFor(routerType) match {
case RouterType.Direct () new DirectRouter
case RouterType.Random () new RandomRouter
case RouterType.RoundRobin () new RoundRobinRouter
case RouterType.ScatterGather () new ScatterGatherFirstCompletedRouter()(
if (props.dispatcher == Props.defaultDispatcher) dispatcher else props.dispatcher, settings.ActorTimeout)
case RouterType.LeastCPU sys.error("Router LeastCPU not supported yet")
case RouterType.LeastRAM sys.error("Router LeastRAM not supported yet")
case RouterType.LeastMessages sys.error("Router LeastMessages not supported yet")
case RouterType.Custom(implClass) () Routing.createCustomRouter(implClass)
}
newFuture completeWithResult actor
actors.replace(path.toString, newFuture, actor)
actor
case actor: ActorRef
actor
case future: Future[_]
future.get.asInstanceOf[ActorRef]
}
val connections: Iterable[ActorRef] = (1 to nrOfInstances.factor) map { i
val routedPath = path.parent / (path.name + ":" + i)
new LocalActorRef(system, props, supervisor, routedPath, systemService)
}
actorOf(system, RoutedProps(routerFactory = routerFactory, connectionManager = new LocalConnectionManager(connections)), supervisor, path.toString)
case unknown throw new Exception("Don't know how to create this actor ref! Why? Got: " + unknown)
}
}
/**
@ -356,7 +352,7 @@ class LocalActorRefProvider(
new RoutedActorRef(system, props, supervisor, name)
}
private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] = actorFor(ActorPath.split(actor.path))
private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] = Some(actorFor(actor.path))
private[akka] def serialize(actor: ActorRef): SerializedActorRef = new SerializedActorRef(rootPath.address, actor.path.toString)
private[akka] def createDeathWatch(): DeathWatch = new LocalDeathWatch
@ -367,8 +363,7 @@ class LocalActorRefProvider(
case t if t.duration.length <= 0
new DefaultPromise[Any](0)(dispatcher) //Abort early if nonsensical timeout
case t
val a = new AskActorRef(tempPath, this, deathWatch, t, dispatcher) { def whenDone() = actors.remove(this) }
assert(actors.putIfAbsent(a.path.toString, a) eq null) //If this fails, we're in deep trouble
val a = new AskActorRef(tempPath, this, deathWatch, t, dispatcher)
recipient.tell(message, a)
a.result
}

View file

@ -55,7 +55,7 @@ object ActorSystem {
def create(): ActorSystem = apply()
def apply(): ActorSystem = apply("default")
class Settings(cfg: Config) {
class Settings(cfg: Config, val name: String) {
private def referenceConfig: Config =
ConfigFactory.parseResource(classOf[ActorSystem], "/akka-actor-reference.conf",
ConfigParseOptions.defaults.setAllowMissing(false))
@ -287,11 +287,34 @@ class ActorSystemImpl(val name: String, val applicationConfig: Config) extends A
import ActorSystem._
val settings = new Settings(applicationConfig)
val settings = new Settings(applicationConfig, name)
protected def systemImpl = this
private[akka] def systemActorOf(props: Props, address: String): ActorRef = provider.actorOf(this, props, systemGuardian, address, true)
private val systemActors = new ConcurrentHashMap[String, ActorRef]
private[akka] def systemActorOf(props: Props, name: String): ActorRef = {
if (systemActors.putIfAbsent(name, deadLetters) eq null) {
val actor = provider.actorOf(this, props, systemGuardian, name, true)
systemActors.replace(name, actor)
deathWatch.subscribe(systemActorsJanitor, actor)
actor
} else throw new InvalidActorNameException("system actor name " + name + " is not unique!")
}
private val actors = new ConcurrentHashMap[String, ActorRef]
protected def isDuplicate(name: String): Boolean = {
actors.putIfAbsent(name, deadLetters) ne null
}
override def actorOf(props: Props, name: String): ActorRef = {
val actor = super.actorOf(props, name)
// this would have thrown an exception in case of a duplicate name
actors.replace(name, actor)
deathWatch.subscribe(actorsJanitor, actor)
actor
}
import settings._
@ -302,25 +325,6 @@ class ActorSystemImpl(val name: String, val applicationConfig: Config) extends A
val scheduler = new DefaultScheduler(new HashedWheelTimer(log, Executors.defaultThreadFactory, settings.SchedulerTickDuration, settings.SchedulerTicksPerWheel))
val provider: ActorRefProvider = {
val providerClass = ReflectiveAccess.getClassFor(ProviderClass) match {
case Left(e) throw e
case Right(b) b
}
val arguments = Seq(
classOf[Settings] -> settings,
classOf[EventStream] -> eventStream,
classOf[Scheduler] -> scheduler)
val types: Array[Class[_]] = arguments map (_._1) toArray
val values: Array[AnyRef] = arguments map (_._2) toArray
ReflectiveAccess.createInstance[ActorRefProvider](providerClass, types, values) match {
case Left(e: InvocationTargetException) throw e.getTargetException
case Left(e) throw e
case Right(p) p
}
}
val deadLetters = new DeadLetterActorRef(eventStream)
val deadLetterMailbox = new Mailbox(null) {
becomeClosed()
@ -333,6 +337,35 @@ class ActorSystemImpl(val name: String, val applicationConfig: Config) extends A
override def numberOfMessages = 0
}
val provider: ActorRefProvider = {
val providerClass = ReflectiveAccess.getClassFor(ProviderClass) match {
case Left(e) throw e
case Right(b) b
}
val arguments = Seq(
classOf[String] -> name,
classOf[Settings] -> settings,
classOf[EventStream] -> eventStream,
classOf[Scheduler] -> scheduler,
classOf[ActorRef] -> deadLetters)
val types: Array[Class[_]] = arguments map (_._1) toArray
val values: Array[AnyRef] = arguments map (_._2) toArray
ReflectiveAccess.createInstance[ActorRefProvider](providerClass, types, values) match {
case Left(e: InvocationTargetException) throw e.getTargetException
case Left(e) throw e
case Right(p) p
}
}
val actorsJanitor = MinimalActorRef(provider.rootPath) {
case Terminated(x) actors.remove(x.path.name)
}
val systemActorsJanitor = MinimalActorRef(provider.rootPath) {
case Terminated(x) systemActors.remove(x.path.name)
}
val dispatcherFactory = new Dispatchers(settings, DefaultDispatcherPrerequisites(eventStream, deadLetterMailbox, scheduler))
implicit val dispatcher = dispatcherFactory.defaultGlobalDispatcher

View file

@ -2,6 +2,8 @@
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
import java.net.URI
import java.net.URISyntaxException
/**
* The address specifies the physical location under which an Actor can be
@ -19,3 +21,18 @@ case class LocalAddress(systemName: String) extends Address {
def protocol = "akka"
def hostPort = systemName
}
object LocalActorPath {
def unapply(addr: String): Option[(LocalAddress, Iterable[String])] = {
try {
val uri = new URI(addr)
if (uri.getScheme != "akka") return None
if (uri.getUserInfo != null) return None
if (uri.getHost == null) return None
if (uri.getPath == null) return None
Some(LocalAddress(uri.getHost), uri.getPath.split("/").drop(1))
} catch {
case _: URISyntaxException None
}
}
}

View file

@ -182,7 +182,7 @@ class Deployer(val settings: ActorSystem.Settings, val eventStream: EventStream,
}
if (port == 0) raiseRemoteNodeParsingError()
RemoteAddress(new InetSocketAddress(hostname, port))
RemoteAddress(settings.name, hostname, port)
}
RemoteScope(remoteAddresses)

View file

@ -5,9 +5,11 @@ package akka.event
import akka.actor.{ ActorRef, Actor, Props, ActorSystemImpl, Terminated, ActorSystem, simpleName }
import akka.util.Subclassification
import java.util.concurrent.atomic.AtomicInteger
object EventStream {
implicit def fromActorSystem(system: ActorSystem) = system.eventStream
val generation = new AtomicInteger
}
class A(x: Int = 0) extends Exception("x=" + x)
@ -52,8 +54,12 @@ class EventStream(debug: Boolean = false) extends LoggingBus with SubchannelClas
case ref: ActorRef watch(ref)
case Terminated(ref) unsubscribe(ref)
}
}), "MainBusReaper")
}), "MainBusReaper-" + EventStream.generation.incrementAndGet())
subscribers foreach (reaper ! _)
}
def stop() {
reaper.stop()
}
}

View file

@ -6,32 +6,40 @@ package akka.remote
import akka.actor._
import akka.AkkaException
import scala.reflect.BeanProperty
import java.io.{ PrintWriter, PrintStream }
import java.net.InetSocketAddress
import java.net.URI
import java.net.URISyntaxException
import java.net.InetAddress
object RemoteAddress {
def apply(host: String, port: Int): RemoteAddress = apply(new InetSocketAddress(host, port))
def apply(inetAddress: InetSocketAddress): RemoteAddress = inetAddress match {
case null null
case inet
val host = inet.getAddress match {
case null inet.getHostName //Fall back to given name
case other other.getHostAddress
}
val portNo = inet.getPort
RemoteAddress(portNo, host)
def apply(system: String, host: String, port: Int) = {
val ip = InetAddress.getByName(host)
new RemoteAddress(system, host, ip, port)
}
}
object LocalOnly extends RemoteAddress(0, "local")
case class RemoteAddress private[akka] (port: Int, hostname: String) extends Address {
case class RemoteAddress(system: String, host: String, ip: InetAddress, port: Int) extends Address {
def protocol = "akka"
@transient
lazy val hostPort = hostname + ":" + port
lazy val hostPort = system + "@" + host + ":" + port
}
object RemoteActorPath {
def unapply(addr: String): Option[(RemoteAddress, Iterable[String])] = {
try {
val uri = new URI(addr)
if (uri.getScheme != "akka") return None
if (uri.getUserInfo == null) return None
if (uri.getHost == null) return None
if (uri.getPort == -1) return None
if (uri.getPath == null) return None
Some(RemoteAddress(uri.getUserInfo, uri.getHost, uri.getPort), uri.getPath.split("/").drop(1))
} catch {
case _: URISyntaxException None
}
}
}
class RemoteException(message: String) extends AkkaException(message)

View file

@ -63,9 +63,9 @@ class NetworkEventStream(system: ActorSystemImpl) {
import NetworkEventStream._
// FIXME: check that this supervision is correct
private[akka] val sender = system.provider.actorOf(system,
Props[Channel].copy(dispatcher = system.dispatcherFactory.newPinnedDispatcher("NetworkEventStream")),
system.systemGuardian, "network-event-sender", systemService = true)
private[akka] val sender =
system.systemActorOf(Props[Channel].copy(dispatcher = system.dispatcherFactory.newPinnedDispatcher("NetworkEventStream")),
"network-event-sender")
/**
* Registers a network event stream listener (asyncronously).

View file

@ -38,7 +38,7 @@ class Remote(val system: ActorSystemImpl, val nodename: String) {
private[remote] val remoteExtension = RemoteExtension(system)
private[remote] val serializationExtension = SerializationExtension(system)
private[remote] val remoteAddress = {
RemoteAddress(remoteExtension.settings.serverSettings.Hostname, remoteExtension.settings.serverSettings.Port)
RemoteAddress(system.name, remoteExtension.settings.serverSettings.Hostname, remoteExtension.settings.serverSettings.Port)
}
val failureDetector = new AccrualFailureDetector(system)
@ -141,13 +141,17 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
case Right(instance) instance.asInstanceOf[() Actor]
}
val actorPath = ActorPath(systemImpl, message.getActorPath)
val parent = system.actorFor(actorPath.parent)
if (parent.isDefined) {
systemImpl.provider.actorOf(systemImpl, Props(creator = actorFactory), parent.get, actorPath.name)
} else {
log.error("Parent actor does not exist, ignoring remote system daemon command [{}]", message)
message.getActorPath match {
case RemoteActorPath(addr, elems) if addr == remoteAddress && elems.size > 0
val name = elems.last
system.actorFor(elems.dropRight(1)) match {
case x if x eq system.deadLetters
log.error("Parent actor does not exist, ignoring remote system daemon command [{}]", message)
case parent
systemImpl.provider.actorOf(systemImpl, Props(creator = actorFactory), parent, name)
}
case _
log.error("remote path does not match path from message [{}]", message)
}
} else {
@ -251,7 +255,7 @@ class RemoteMessage(input: RemoteMessageProtocol, remote: RemoteSupport, classLo
else
remote.system.deadLetters
lazy val recipient: ActorRef = remote.system.actorFor(input.getRecipient.getPath).getOrElse(remote.system.deadLetters)
lazy val recipient: ActorRef = remote.system.actorFor(input.getRecipient.getPath)
lazy val payload: Either[Throwable, AnyRef] =
if (input.hasException) Left(parseException())

View file

@ -30,18 +30,19 @@ import akka.serialization.SerializationExtension
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class RemoteActorRefProvider(
val systemName: String,
val settings: ActorSystem.Settings,
val eventStream: EventStream,
val scheduler: Scheduler) extends ActorRefProvider {
val scheduler: Scheduler,
_deadLetters: ActorRef) extends ActorRefProvider {
val log = Logging(eventStream, "RemoteActorRefProvider")
def deathWatch = local.deathWatch
def guardian = local.guardian
def systemGuardian = local.systemGuardian
def nodename = local.nodename
def clustername = local.clustername
def tempName = local.tempName
def nodename = remoteExtension.settings.NodeName
def clustername = remoteExtension.settings.ClusterName
private val actors = new ConcurrentHashMap[String, AnyRef]
@ -57,11 +58,10 @@ class RemoteActorRefProvider(
private lazy val remoteExtension = RemoteExtension(system)
private lazy val serializationExtension = SerializationExtension(system)
lazy val rootPath: ActorPath = {
val remoteAddress = RemoteAddress(remoteExtension.settings.serverSettings.Hostname, remoteExtension.settings.serverSettings.Port)
val remoteAddress = RemoteAddress(system.name, remoteExtension.settings.serverSettings.Hostname, remoteExtension.settings.serverSettings.Port)
new RootActorPath(remoteAddress)
}
private lazy val local = new LocalActorRefProvider(settings, eventStream, scheduler, rootPath,
remoteExtension.settings.NodeName, remoteExtension.settings.ClusterName)
private lazy val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, _deadLetters)
private[akka] lazy val remote = new Remote(system, nodename)
private lazy val remoteDaemonConnectionManager = new RemoteConnectionManager(system, remote)
@ -79,13 +79,10 @@ class RemoteActorRefProvider(
def dispatcher = local.dispatcher
def defaultTimeout = settings.ActorTimeout
private[akka] def actorOf(system: ActorSystemImpl, props: Props, supervisor: ActorRef, name: String, systemService: Boolean): ActorRef =
actorOf(system, props, supervisor, supervisor.path / name, systemService)
private[akka] def actorOf(system: ActorSystemImpl, props: Props, supervisor: ActorRef, path: ActorPath, systemService: Boolean): ActorRef =
if (systemService) local.actorOf(system, props, supervisor, path, systemService)
def actorOf(system: ActorSystemImpl, props: Props, supervisor: ActorRef, name: String, systemService: Boolean): ActorRef =
if (systemService) local.actorOf(system, props, supervisor, name, systemService)
else {
val name = path.name
val path = supervisor.path / name
val newFuture = Promise[ActorRef](5000)(dispatcher) // FIXME is this proper timeout?
actors.putIfAbsent(path.toString, newFuture) match { // we won the race -- create the actor and resolve the future
@ -144,7 +141,7 @@ class RemoteActorRefProvider(
}
val connections = (Map.empty[RemoteAddress, ActorRef] /: remoteAddresses) { (conns, a)
val remoteAddress = RemoteAddress(a.hostname, a.port)
val remoteAddress = RemoteAddress(system.name, a.host, a.port)
conns + (remoteAddress -> RemoteActorRef(remote.system.provider, remote.server, remoteAddress, path, None))
}
@ -182,11 +179,9 @@ class RemoteActorRefProvider(
new RoutedActorRef(system, props, supervisor, name)
}
def actorFor(path: Iterable[String]): Option[ActorRef] = actors.get(ActorPath.join(path)) match {
case null local.actorFor(path)
case actor: ActorRef Some(actor)
case future: Future[_] Some(future.get.asInstanceOf[ActorRef])
}
def actorFor(path: ActorPath): ActorRef = local.actorFor(path)
def actorFor(path: String): ActorRef = local.actorFor(path)
def actorFor(path: Iterable[String]): ActorRef = local.actorFor(path)
// TODO remove me
val optimizeLocal = new AtomicBoolean(true)
@ -195,7 +190,7 @@ class RemoteActorRefProvider(
/**
* Returns true if the actor was in the provider's cache and evicted successfully, else false.
*/
private[akka] def evict(path: String): Boolean = actors.remove(path) ne null
private[akka] def evict(path: ActorPath): Boolean = actors.remove(path) ne null
private[akka] def serialize(actor: ActorRef): SerializedActorRef = actor match {
case r: RemoteActorRef new SerializedActorRef(r.remoteAddress, actor.path.toString)
@ -203,12 +198,12 @@ class RemoteActorRefProvider(
}
private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] = {
val remoteAddress = RemoteAddress(actor.hostname, actor.port)
val remoteAddress = RemoteAddress(systemName, actor.hostname, actor.port)
if (optimizeLocalScoped_? && remoteAddress == remote.remoteAddress) {
local.actorFor(ActorPath.split(actor.path))
Some(local.actorFor(actor.path))
} else {
log.debug("{}: Creating RemoteActorRef with address [{}] connected to [{}]", remote.remoteAddress, actor.path, remoteAddress)
Some(RemoteActorRef(remote.system.provider, remote.server, remoteAddress, rootPath / ActorPath.split(actor.path), None)) //Should it be None here
Some(RemoteActorRef(remote.system.provider, remote.server, remoteAddress, rootPath / actor.path, None)) // FIXME I know, this is broken
}
}

View file

@ -152,7 +152,7 @@ class ActiveRemoteClient private[akka] (
val handshake = RemoteControlProtocol.newBuilder.setCommandType(CommandType.CONNECT)
if (SecureCookie.nonEmpty) handshake.setCookie(SecureCookie.get)
handshake.setOrigin(RemoteProtocol.AddressProtocol.newBuilder
.setHostname(senderRemoteAddress.hostname)
.setHostname(senderRemoteAddress.host)
.setPort(senderRemoteAddress.port)
.build)
connection.getChannel.write(remoteSupport.createControlEnvelope(handshake.build))
@ -166,7 +166,7 @@ class ActiveRemoteClient private[akka] (
def attemptReconnect(): Boolean = {
log.debug("Remote client reconnecting to [{}]", remoteAddress)
val connection = bootstrap.connect(new InetSocketAddress(remoteAddress.hostname, remoteAddress.port))
val connection = bootstrap.connect(new InetSocketAddress(remoteAddress.ip, remoteAddress.port))
openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails.
if (!connection.isSuccess) {
@ -189,7 +189,7 @@ class ActiveRemoteClient private[akka] (
log.debug("Starting remote client connection to [{}]", remoteAddress)
connection = bootstrap.connect(new InetSocketAddress(remoteAddress.hostname, remoteAddress.port))
connection = bootstrap.connect(new InetSocketAddress(remoteAddress.ip, remoteAddress.port))
val channel = connection.awaitUninterruptibly.getChannel
openChannels.add(channel)
@ -512,7 +512,7 @@ class NettyRemoteServer(val remoteSupport: NettyRemoteSupport, val loader: Optio
bootstrap.setOption("child.reuseAddress", true)
bootstrap.setOption("child.connectTimeoutMillis", ConnectionTimeout.toMillis)
openChannels.add(bootstrap.bind(new InetSocketAddress(address.hostname, address.port)))
openChannels.add(bootstrap.bind(new InetSocketAddress(address.ip, address.port)))
remoteSupport.notifyListeners(RemoteServerStarted(remoteSupport))
def shutdown() {
@ -645,7 +645,8 @@ class RemoteServerHandler(
instruction.getCommandType match {
case CommandType.CONNECT if UsePassiveConnections
val origin = instruction.getOrigin
val inbound = RemoteAddress(origin.getHostname, origin.getPort)
// FIXME need to include system-name in remote protocol
val inbound = RemoteAddress("BORKED", origin.getHostname, origin.getPort)
val client = new PassiveRemoteClient(event.getChannel, remoteSupport, inbound)
remoteSupport.bindClient(inbound, client)
case CommandType.SHUTDOWN //TODO FIXME Dispose passive connection here
@ -664,7 +665,7 @@ class RemoteServerHandler(
private def getClientAddress(c: Channel): Option[RemoteAddress] =
c.getRemoteAddress match {
case inet: InetSocketAddress Some(RemoteAddress(inet))
case inet: InetSocketAddress Some(RemoteAddress("BORKED", inet.getHostName, inet.getPort)) // FIXME Broken!
case _ None
}
}

View file

@ -6,7 +6,7 @@ import akka.testkit.AkkaSpec
class AccrualFailureDetectorSpec extends AkkaSpec {
"An AccrualFailureDetector" must {
val conn = RemoteAddress(new InetSocketAddress("localhost", 2552))
val conn = RemoteAddress("tester", "localhost", 2552)
"mark node as available after a series of successful heartbeats" in {
val fd = new AccrualFailureDetector