From 3182fa3d73a8a488148bd886ca166f2f7b07a5c0 Mon Sep 17 00:00:00 2001 From: Roland Date: Tue, 29 Nov 2011 16:32:50 +0100 Subject: [PATCH] second step: remove LocalActorRefProvider.actors - duplicate name detection done within ActorCell/ActorSystem (i.e. at parent level) - no caching needed for local look-ups, might re-introduce cache in remote layer - implement suitable equals/hashCode on ActorPaths - fix some (unintended => buggy) name reuses which previously silently returned a different actor - serialization & EventStreamSpec still failing, need to commit to merge in other stuff on which the future fixes will depend --- .../test/scala/akka/actor/DeployerSpec.scala | 2 +- .../actor/LocalActorRefProviderSpec.scala | 16 +- .../src/main/scala/akka/actor/Actor.scala | 2 + .../src/main/scala/akka/actor/ActorCell.scala | 16 +- .../src/main/scala/akka/actor/ActorPath.scala | 91 ++++---- .../src/main/scala/akka/actor/ActorRef.scala | 15 +- .../scala/akka/actor/ActorRefProvider.scala | 209 +++++++++--------- .../main/scala/akka/actor/ActorSystem.scala | 77 +++++-- .../src/main/scala/akka/actor/Address.scala | 17 ++ .../src/main/scala/akka/actor/Deployer.scala | 2 +- .../main/scala/akka/event/EventStream.scala | 8 +- .../scala/akka/remote/RemoteInterface.scala | 40 ++-- .../akka/remote/NetworkEventStream.scala | 6 +- .../src/main/scala/akka/remote/Remote.scala | 22 +- .../akka/remote/RemoteActorRefProvider.scala | 41 ++-- .../remote/netty/NettyRemoteSupport.scala | 13 +- .../remote/AccrualFailureDetectorSpec.scala | 2 +- 17 files changed, 329 insertions(+), 250 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala index 68a81d9797..b6dae1a6de 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala @@ -120,7 +120,7 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) { RoundRobin, NrOfInstances(3), RemoteScope(Seq( - RemoteAddress("wallace", 2552), RemoteAddress("gromit", 2552)))))) + RemoteAddress(system.name, "wallace", 2552), RemoteAddress(system.name, "gromit", 2552)))))) } "be able to parse 'akka.actor.deployment._' with recipe" in { diff --git a/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala index 707c425295..de59894431 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala @@ -12,16 +12,28 @@ import akka.dispatch.Future class LocalActorRefProviderSpec extends AkkaSpec { "An LocalActorRefProvider" must { + "find actor refs using actorFor" in { + val a = actorOf(Props(ctx ⇒ { case _ ⇒ })) + val b = system.actorFor(a.path) + a must be === b + } + "only create one instance of an actor with a specific address in a concurrent environment" in { val impl = system.asInstanceOf[ActorSystemImpl] val provider = impl.provider provider.isInstanceOf[LocalActorRefProvider] must be(true) - (0 until 100) foreach { i ⇒ // 100 concurrent runs + for (i ← 0 until 100) { val address = "new-actor" + i implicit val timeout = Timeout(5 seconds) - ((1 to 4) map { _ ⇒ Future { provider.actorOf(impl, Props(c ⇒ { case _ ⇒ }), impl.guardian, address) } }).map(_.get).distinct.size must be(1) + val actors = for (j ← 1 to 4) yield Future(system.actorOf(Props(c ⇒ { case _ ⇒ }), address)) + val set = Set() ++ actors.map(_.await.value match { + case Some(Right(a: ActorRef)) ⇒ 1 + case Some(Left(ex: InvalidActorNameException)) ⇒ 2 + case x ⇒ x + }) + set must be === Set(1, 2) } } } diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index b8c0bbb327..efaedcde49 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -77,6 +77,8 @@ class ActorKilledException private[akka] (message: String, cause: Throwable) def this(msg: String) = this(msg, null); } +case class InvalidActorNameException(message: String) extends AkkaException(message) + case class ActorInitializationException private[akka] (actor: ActorRef, message: String, cause: Throwable = null) extends AkkaException(message, cause) with NoStackTrace { def this(msg: String) = this(null, msg, null); diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 330824290f..8fc25d84bf 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -97,6 +97,10 @@ private[akka] class ActorCell( var childrenRefs: TreeMap[String, ChildRestartStats] = emptyChildrenRefs + protected def isDuplicate(name: String): Boolean = { + childrenRefs contains name + } + var currentMessage: Envelope = null var actor: Actor = _ @@ -152,8 +156,13 @@ private[akka] class ActorCell( final def children: Iterable[ActorRef] = childrenRefs.values.view.map(_.child) - final def getChild(name: String): Option[ActorRef] = - if (isTerminated) None else childrenRefs.get(name).map(_.child) + final def getChild(name: String): ActorRef = + if (isTerminated) null + else { + val c = childrenRefs + if (c contains name) c(name).child + else null + } final def tell(message: Any, sender: ActorRef): Unit = dispatcher.dispatch(this, Envelope(message, if (sender eq null) system.deadLetters else sender)) @@ -360,9 +369,6 @@ private[akka] class ActorCell( } private def doTerminate() { - if (!system.provider.evict(self.path.toString)) - system.eventStream.publish(Warning(self.toString, "evict of " + self.path.toString + " failed")) - dispatcher.detach(this) try { diff --git a/akka-actor/src/main/scala/akka/actor/ActorPath.scala b/akka-actor/src/main/scala/akka/actor/ActorPath.scala index 11e735218a..4203c5c9b5 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorPath.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorPath.scala @@ -5,63 +5,20 @@ package akka.actor import scala.annotation.tailrec object ActorPath { + // this cannot really be changed due to usage of standard URI syntax final val separator = "/" - - val pattern = """(/[0-9a-zA-Z\-\_\$\.]+)+""".r.pattern - - /** - * Create an actor path from a string. - */ - def apply(system: ActorSystem, path: String): ActorPath = - apply(system, split(path)) - - /** - * Create an actor path from an iterable. - */ - def apply(system: ActorSystem, path: Iterable[String]): ActorPath = - path.foldLeft(system.asInstanceOf[ActorSystemImpl].provider.rootPath)(_ / _) - - /** - * Split a string path into an iterable. - */ - def split(path: String): Iterable[String] = - if (path.startsWith(separator)) - path.substring(1).split(separator) - else - path.split(separator) - - /** - * Join an iterable path into a string. - */ - def join(path: Iterable[String]): String = - path.mkString(separator, separator, "") - - /** - * Is this string representation of a path valid? - */ - def valid(path: String): Boolean = - pattern.matcher(path).matches - - /** - * Validate a path. Moved here from Address.validate. - * Throws an IllegalArgumentException if the path is invalid. - */ - def validate(path: String): Unit = { - if (!valid(path)) - throw new IllegalArgumentException("Path [" + path + "] is not valid. Needs to follow this pattern: " + pattern) - } } /** * Actor path is a unique path to an actor that shows the creation path * up through the actor tree to the root actor. */ -trait ActorPath { +sealed trait ActorPath { /** * The Address under which this path can be reached; walks up the tree to * the RootActorPath. */ - def address: Address = root.address + def address: Address /** * The name of the actor that this path refers to. @@ -84,7 +41,7 @@ trait ActorPath { def /(child: Iterable[String]): ActorPath = (this /: child)(_ / _) /** - * Sequence of names for this path. + * Sequence of names for this path. Performance implication: has to allocate a list. */ def pathElements: Iterable[String] @@ -92,13 +49,14 @@ trait ActorPath { * Walk up the tree to obtain and return the RootActorPath. */ def root: RootActorPath + } /** * Root of the hierarchy of ActorPaths. There is exactly root per ActorSystem * and node (for remote-enabled or clustered systems). */ -class RootActorPath(override val address: Address, val name: String = ActorPath.separator) extends ActorPath { +final case class RootActorPath(address: Address, name: String = ActorPath.separator) extends ActorPath { def parent: ActorPath = this @@ -108,10 +66,12 @@ class RootActorPath(override val address: Address, val name: String = ActorPath. def pathElements: Iterable[String] = Iterable.empty - override val toString = address + ActorPath.separator + override val toString = address + name } -class ChildActorPath(val parent: ActorPath, val name: String) extends ActorPath { +final class ChildActorPath(val parent: ActorPath, val name: String) extends ActorPath { + + def address: Address = root.address def /(child: String): ActorPath = new ChildActorPath(this, child) @@ -133,6 +93,12 @@ class ChildActorPath(val parent: ActorPath, val name: String) extends ActorPath rec(this) } + // TODO research whether this should be cached somehow (might be fast enough, but creates GC pressure) + /* + * idea: add one field which holds the total length (because that is known) + * so that only one String needs to be allocated before traversal; this is + * cheaper than any cache + */ override def toString = { @tailrec def rec(p: ActorPath, s: String): String = p match { @@ -142,5 +108,30 @@ class ChildActorPath(val parent: ActorPath, val name: String) extends ActorPath } rec(this, "") } + + override def equals(other: Any): Boolean = { + @tailrec + def rec(left: ActorPath, right: ActorPath): Boolean = + if (left eq right) true + else if (left.isInstanceOf[RootActorPath] || right.isInstanceOf[RootActorPath]) left == right + else left.name == right.name && rec(left.parent, right.parent) + + other match { + case p: ActorPath ⇒ rec(this, p) + case _ ⇒ false + } + } + + override def hashCode: Int = { + import scala.util.MurmurHash._ + + @tailrec + def rec(p: ActorPath, h: Int, c: Int, k: Int): Int = p match { + case r: RootActorPath ⇒ extendHash(h, r.##, c, k) + case _ ⇒ rec(p.parent, extendHash(h, stringHash(name), c, k), nextMagicA(c), nextMagicB(k)) + } + + finalizeHash(rec(this, startHash(42), startMagicA, startMagicB)) + } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 101c69a81b..fc8c6f950a 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -309,7 +309,7 @@ case class SerializedActorRef(hostname: String, port: Int, path: String) { // FIXME this is broken, but see above def this(address: Address, path: String) = this(address.hostPort, 0, path) - def this(remoteAddress: RemoteAddress, path: String) = this(remoteAddress.hostname, remoteAddress.port, path) + def this(remoteAddress: RemoteAddress, path: String) = this(remoteAddress.host, remoteAddress.port, path) def this(remoteAddress: InetSocketAddress, path: String) = this(remoteAddress.getAddress.getHostAddress, remoteAddress.getPort, path) //TODO FIXME REMOVE @throws(classOf[java.io.ObjectStreamException]) @@ -351,6 +351,15 @@ trait MinimalActorRef extends ActorRef with ScalaActorRef { protected[akka] def restart(cause: Throwable): Unit = () } +object MinimalActorRef { + def apply(_path: ActorPath)(receive: PartialFunction[Any, Unit]): ActorRef = new MinimalActorRef { + def path = _path + def address = path.toString + override def !(message: Any)(implicit sender: ActorRef = null): Unit = + if (receive.isDefinedAt(message)) receive(message) + } +} + case class DeadLetter(message: Any, sender: ActorRef, recipient: ActorRef) object DeadLetterActorRef { @@ -399,7 +408,7 @@ class DeadLetterActorRef(val eventStream: EventStream) extends MinimalActorRef { private def writeReplace(): AnyRef = DeadLetterActorRef.serialized } -abstract class AskActorRef(val path: ActorPath, provider: ActorRefProvider, deathWatch: DeathWatch, timeout: Timeout, val dispatcher: MessageDispatcher) extends MinimalActorRef { +class AskActorRef(val path: ActorPath, provider: ActorRefProvider, deathWatch: DeathWatch, timeout: Timeout, val dispatcher: MessageDispatcher) extends MinimalActorRef { final val result = new DefaultPromise[Any](timeout)(dispatcher) override def name = path.name @@ -412,7 +421,7 @@ abstract class AskActorRef(val path: ActorPath, provider: ActorRefProvider, deat result onTimeout callback } - protected def whenDone(): Unit + protected def whenDone(): Unit = {} override def !(message: Any)(implicit sender: ActorRef = null): Unit = message match { case Status.Success(r) ⇒ result.completeWithResult(r) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 4ebf29a835..63d74b9db7 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -17,21 +17,26 @@ import akka.AkkaException import com.eaio.uuid.UUID import akka.util.{ Duration, Switch, Helpers } import akka.remote.RemoteAddress -import akka.remote.LocalOnly +import org.jboss.netty.akka.util.internal.ConcurrentIdentityHashMap /** * Interface for all ActorRef providers to implement. */ trait ActorRefProvider { - def actorOf(system: ActorSystemImpl, props: Props, supervisor: ActorRef, name: String): ActorRef = actorOf(system, props, supervisor, name, false) - - def actorFor(path: Iterable[String]): Option[ActorRef] - + /** + * Reference to the supervisor used for all top-level user actors. + */ def guardian: ActorRef + /** + * Reference to the supervisor used for all top-level system actors. + */ def systemGuardian: ActorRef + /** + * Reference to the death watch service. + */ def deathWatch: DeathWatch // FIXME: remove/replace? @@ -47,6 +52,12 @@ trait ActorRefProvider { def settings: ActorSystem.Settings + /** + * Initialization of an ActorRefProvider happens in two steps: first + * construction of the object with settings, eventStream, scheduler, etc. + * and then—when the ActorSystem is constructed—the second phase during + * which actors may be created (e.g. the guardians). + */ def init(system: ActorSystemImpl) private[akka] def deployer: Deployer @@ -54,21 +65,33 @@ trait ActorRefProvider { private[akka] def scheduler: Scheduler /** - * Create an Actor with the given name below the given supervisor. + * Actor factory with create-only semantics: will create an actor as + * described by props with the given supervisor and path (may be different + * in case of remote supervision). If systemService is true, deployment is + * bypassed (local-only). */ - private[akka] def actorOf(system: ActorSystemImpl, props: Props, supervisor: ActorRef, name: String, systemService: Boolean): ActorRef + def actorOf(system: ActorSystemImpl, props: Props, supervisor: ActorRef, name: String, systemService: Boolean = false): ActorRef /** - * Create an Actor with the given full path below the given supervisor. - * - * FIXME: Remove! this is dangerous! + * Create actor reference for a specified local or remote path. If no such + * actor exists, it will be (equivalent to) a dead letter reference. */ - private[akka] def actorOf(system: ActorSystemImpl, props: Props, supervisor: ActorRef, path: ActorPath, systemService: Boolean): ActorRef + def actorFor(path: ActorPath): ActorRef /** - * Remove this path from the lookup map. + * Create actor reference for a specified local or remote path, which will + * be parsed using java.net.URI. If no such actor exists, it will be + * (equivalent to) a dead letter reference. */ - private[akka] def evict(path: String): Boolean + def actorFor(s: String): ActorRef + + /** + * Create actor reference for the specified child path starting at the root + * guardian. This method always returns an actor which is “logically local”, + * i.e. it cannot be used to obtain a reference to an actor which is not + * physically or logically attached to this actor system. + */ + def actorFor(p: Iterable[String]): ActorRef private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] @@ -106,16 +129,20 @@ trait ActorRefFactory { protected def randomName(): String + /** + * Child names must be unique within the context of one parent; implement + * this method to have the default implementation of actorOf perform the + * check (and throw an exception if necessary). + */ + protected def isDuplicate(name: String): Boolean + def actorOf(props: Props): ActorRef = provider.actorOf(systemImpl, props, guardian, randomName(), false) - /* - * TODO this will have to go at some point, because creating two actors with - * the same address can race on the cluster, and then you never know which - * implementation wins - */ def actorOf(props: Props, name: String): ActorRef = { if (name == null || name == "" || name.startsWith("$")) - throw new ActorInitializationException("actor name must not be null, empty or start with $") + throw new InvalidActorNameException("actor name must not be null, empty or start with $") + if (isDuplicate(name)) + throw new InvalidActorNameException("actor name " + name + " is not unique!") provider.actorOf(systemImpl, props, guardian, name, false) } @@ -130,11 +157,13 @@ trait ActorRefFactory { def actorOf(creator: UntypedActorFactory): ActorRef = actorOf(Props(() ⇒ creator.create())) - def actorFor(path: ActorPath): Option[ActorRef] = actorFor(path.pathElements) + def actorOf(creator: UntypedActorFactory, name: String): ActorRef = actorOf(Props(() ⇒ creator.create()), name) - def actorFor(path: String): Option[ActorRef] = actorFor(ActorPath.split(path)) + def actorFor(path: ActorPath) = provider.actorFor(path) - def actorFor(path: Iterable[String]): Option[ActorRef] = provider.actorFor(path) + def actorFor(path: String): ActorRef = provider.actorFor(path) + + def actorFor(path: Iterable[String]): ActorRef = provider.actorFor(path) } class ActorRefProviderException(message: String) extends AkkaException(message) @@ -143,16 +172,17 @@ class ActorRefProviderException(message: String) extends AkkaException(message) * Local ActorRef provider. */ class LocalActorRefProvider( + _systemName: String, val settings: ActorSystem.Settings, val eventStream: EventStream, val scheduler: Scheduler, - val rootPath: ActorPath, - val nodename: String, - val clustername: String) extends ActorRefProvider { + val deadLetters: ActorRef) extends ActorRefProvider { - def this(settings: ActorSystem.Settings, eventStream: EventStream, scheduler: Scheduler) { - this(settings, eventStream, scheduler, new RootActorPath(LocalOnly), "local", "local") - } + val rootPath: ActorPath = new RootActorPath(LocalAddress(_systemName)) + + // FIXME remove both + val nodename: String = "local" + val clustername: String = "local" val log = Logging(eventStream, "LocalActorRefProvider") @@ -162,14 +192,10 @@ class LocalActorRefProvider( * generate name for temporary actor refs */ private val tempNumber = new AtomicLong - def tempName = "$_" + Helpers.base64(tempNumber.getAndIncrement()) + private def tempName = "$_" + Helpers.base64(tempNumber.getAndIncrement()) private val tempNode = rootPath / "tmp" def tempPath = tempNode / tempName - // FIXME (actor path): this could become a cache for the new tree traversal actorFor - // currently still used for tmp actors (e.g. ask actor refs) - private val actors = new ConcurrentHashMap[String, AnyRef] - /** * Top-level anchor for the supervision hierarchy of this actor system. Will * receive only Supervise/ChildTerminated system messages or Failure message. @@ -240,7 +266,7 @@ class LocalActorRefProvider( private var system: ActorSystemImpl = _ def dispatcher: MessageDispatcher = system.dispatcher lazy val terminationFuture: DefaultPromise[Unit] = new DefaultPromise[Unit](Timeout.never)(dispatcher) - lazy val rootGuardian: ActorRef = actorOf(system, guardianProps, theOneWhoWalksTheBubblesOfSpaceTime, rootPath, true) + lazy val rootGuardian: ActorRef = new LocalActorRef(system, guardianProps, theOneWhoWalksTheBubblesOfSpaceTime, rootPath, true) lazy val guardian: ActorRef = actorOf(system, guardianProps, rootGuardian, "app", true) lazy val systemGuardian: ActorRef = actorOf(system, guardianProps.withCreator(new SystemGuardian), rootGuardian, "sys", true) @@ -253,88 +279,58 @@ class LocalActorRefProvider( deathWatch.subscribe(rootGuardian, systemGuardian) } - // FIXME (actor path): should start at the new root guardian, and not use the tail (just to avoid the expected "system" name for now) - def actorFor(path: Iterable[String]): Option[ActorRef] = findInCache(ActorPath.join(path)) orElse findInTree(Some(guardian), path.tail) + def actorFor(path: String): ActorRef = path match { + case LocalActorPath(address, elems) if address == rootPath.address ⇒ + findInTree(rootGuardian.asInstanceOf[LocalActorRef], elems) + case _ ⇒ deadLetters + } + + def actorFor(path: ActorPath): ActorRef = findInTree(rootGuardian.asInstanceOf[LocalActorRef], path.pathElements) + + def actorFor(path: Iterable[String]): ActorRef = findInTree(rootGuardian.asInstanceOf[LocalActorRef], path) @tailrec - private def findInTree(start: Option[ActorRef], path: Iterable[String]): Option[ActorRef] = { + private def findInTree(start: LocalActorRef, path: Iterable[String]): ActorRef = { if (path.isEmpty) start - else { - val child = start match { - case Some(local: LocalActorRef) ⇒ local.underlying.getChild(path.head) - case _ ⇒ None - } - findInTree(child, path.tail) + else start.underlying.getChild(path.head) match { + case null ⇒ deadLetters + case child: LocalActorRef ⇒ findInTree(child, path.tail) + case _ ⇒ deadLetters } } - private def findInCache(path: String): Option[ActorRef] = actors.get(path) match { - case null ⇒ None - case actor: ActorRef ⇒ Some(actor) - case future: Future[_] ⇒ Some(future.get.asInstanceOf[ActorRef]) - } + def actorOf(system: ActorSystemImpl, props: Props, supervisor: ActorRef, name: String, systemService: Boolean): ActorRef = { + val path = supervisor.path / name + (if (systemService) None else deployer.lookupDeployment(path.toString)) match { - /** - * Returns true if the actor was in the provider's cache and evicted successfully, else false. - */ - private[akka] def evict(path: String): Boolean = actors.remove(path) ne null + // create a local actor + case None | Some(DeploymentConfig.Deploy(_, _, DeploymentConfig.Direct, _, DeploymentConfig.LocalScope)) ⇒ + new LocalActorRef(system, props, supervisor, path, systemService) // create a local actor - private[akka] def actorOf(system: ActorSystemImpl, props: Props, supervisor: ActorRef, name: String, systemService: Boolean): ActorRef = - actorOf(system, props, supervisor, supervisor.path / name, systemService) + // create a routed actor ref + case deploy @ Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, DeploymentConfig.LocalScope)) ⇒ - private[akka] def actorOf(system: ActorSystemImpl, props: Props, supervisor: ActorRef, path: ActorPath, systemService: Boolean): ActorRef = { - val name = path.name - val newFuture = Promise[ActorRef](5000)(dispatcher) // FIXME is this proper timeout? - - actors.putIfAbsent(path.toString, newFuture) match { - case null ⇒ - val actor: ActorRef = try { - (if (systemService) None else deployer.lookupDeployment(path.toString)) match { // see if the deployment already exists, if so use it, if not create actor - - // create a local actor - case None | Some(DeploymentConfig.Deploy(_, _, DeploymentConfig.Direct, _, DeploymentConfig.LocalScope)) ⇒ - new LocalActorRef(system, props, supervisor, path, systemService) // create a local actor - - // create a routed actor ref - case deploy @ Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, DeploymentConfig.LocalScope)) ⇒ - - val routerFactory: () ⇒ Router = DeploymentConfig.routerTypeFor(routerType) match { - case RouterType.Direct ⇒ () ⇒ new DirectRouter - case RouterType.Random ⇒ () ⇒ new RandomRouter - case RouterType.RoundRobin ⇒ () ⇒ new RoundRobinRouter - case RouterType.ScatterGather ⇒ () ⇒ new ScatterGatherFirstCompletedRouter()( - if (props.dispatcher == Props.defaultDispatcher) dispatcher else props.dispatcher, settings.ActorTimeout) - case RouterType.LeastCPU ⇒ sys.error("Router LeastCPU not supported yet") - case RouterType.LeastRAM ⇒ sys.error("Router LeastRAM not supported yet") - case RouterType.LeastMessages ⇒ sys.error("Router LeastMessages not supported yet") - case RouterType.Custom(implClass) ⇒ () ⇒ Routing.createCustomRouter(implClass) - } - - val connections: Iterable[ActorRef] = (1 to nrOfInstances.factor) map { i ⇒ - val routedPath = path.parent / (path.name + ":" + i) - new LocalActorRef(system, props, supervisor, routedPath, systemService) - } - - actorOf(system, RoutedProps(routerFactory = routerFactory, connectionManager = new LocalConnectionManager(connections)), supervisor, path.toString) - - case unknown ⇒ throw new Exception("Don't know how to create this actor ref! Why? Got: " + unknown) - } - } catch { - case e: Exception ⇒ - newFuture completeWithException e // so the other threads gets notified of error - //TODO FIXME should we remove the mapping in "actors" here? - throw e + val routerFactory: () ⇒ Router = DeploymentConfig.routerTypeFor(routerType) match { + case RouterType.Direct ⇒ () ⇒ new DirectRouter + case RouterType.Random ⇒ () ⇒ new RandomRouter + case RouterType.RoundRobin ⇒ () ⇒ new RoundRobinRouter + case RouterType.ScatterGather ⇒ () ⇒ new ScatterGatherFirstCompletedRouter()( + if (props.dispatcher == Props.defaultDispatcher) dispatcher else props.dispatcher, settings.ActorTimeout) + case RouterType.LeastCPU ⇒ sys.error("Router LeastCPU not supported yet") + case RouterType.LeastRAM ⇒ sys.error("Router LeastRAM not supported yet") + case RouterType.LeastMessages ⇒ sys.error("Router LeastMessages not supported yet") + case RouterType.Custom(implClass) ⇒ () ⇒ Routing.createCustomRouter(implClass) } - newFuture completeWithResult actor - actors.replace(path.toString, newFuture, actor) - actor - case actor: ActorRef ⇒ - actor - case future: Future[_] ⇒ - future.get.asInstanceOf[ActorRef] - } + val connections: Iterable[ActorRef] = (1 to nrOfInstances.factor) map { i ⇒ + val routedPath = path.parent / (path.name + ":" + i) + new LocalActorRef(system, props, supervisor, routedPath, systemService) + } + actorOf(system, RoutedProps(routerFactory = routerFactory, connectionManager = new LocalConnectionManager(connections)), supervisor, path.toString) + + case unknown ⇒ throw new Exception("Don't know how to create this actor ref! Why? Got: " + unknown) + } } /** @@ -356,7 +352,7 @@ class LocalActorRefProvider( new RoutedActorRef(system, props, supervisor, name) } - private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] = actorFor(ActorPath.split(actor.path)) + private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] = Some(actorFor(actor.path)) private[akka] def serialize(actor: ActorRef): SerializedActorRef = new SerializedActorRef(rootPath.address, actor.path.toString) private[akka] def createDeathWatch(): DeathWatch = new LocalDeathWatch @@ -367,8 +363,7 @@ class LocalActorRefProvider( case t if t.duration.length <= 0 ⇒ new DefaultPromise[Any](0)(dispatcher) //Abort early if nonsensical timeout case t ⇒ - val a = new AskActorRef(tempPath, this, deathWatch, t, dispatcher) { def whenDone() = actors.remove(this) } - assert(actors.putIfAbsent(a.path.toString, a) eq null) //If this fails, we're in deep trouble + val a = new AskActorRef(tempPath, this, deathWatch, t, dispatcher) recipient.tell(message, a) a.result } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 1bf1ea2bd1..1aec26cfc2 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -55,7 +55,7 @@ object ActorSystem { def create(): ActorSystem = apply() def apply(): ActorSystem = apply("default") - class Settings(cfg: Config) { + class Settings(cfg: Config, val name: String) { private def referenceConfig: Config = ConfigFactory.parseResource(classOf[ActorSystem], "/akka-actor-reference.conf", ConfigParseOptions.defaults.setAllowMissing(false)) @@ -287,11 +287,34 @@ class ActorSystemImpl(val name: String, val applicationConfig: Config) extends A import ActorSystem._ - val settings = new Settings(applicationConfig) + val settings = new Settings(applicationConfig, name) protected def systemImpl = this - private[akka] def systemActorOf(props: Props, address: String): ActorRef = provider.actorOf(this, props, systemGuardian, address, true) + private val systemActors = new ConcurrentHashMap[String, ActorRef] + + private[akka] def systemActorOf(props: Props, name: String): ActorRef = { + if (systemActors.putIfAbsent(name, deadLetters) eq null) { + val actor = provider.actorOf(this, props, systemGuardian, name, true) + systemActors.replace(name, actor) + deathWatch.subscribe(systemActorsJanitor, actor) + actor + } else throw new InvalidActorNameException("system actor name " + name + " is not unique!") + } + + private val actors = new ConcurrentHashMap[String, ActorRef] + + protected def isDuplicate(name: String): Boolean = { + actors.putIfAbsent(name, deadLetters) ne null + } + + override def actorOf(props: Props, name: String): ActorRef = { + val actor = super.actorOf(props, name) + // this would have thrown an exception in case of a duplicate name + actors.replace(name, actor) + deathWatch.subscribe(actorsJanitor, actor) + actor + } import settings._ @@ -302,25 +325,6 @@ class ActorSystemImpl(val name: String, val applicationConfig: Config) extends A val scheduler = new DefaultScheduler(new HashedWheelTimer(log, Executors.defaultThreadFactory, settings.SchedulerTickDuration, settings.SchedulerTicksPerWheel)) - val provider: ActorRefProvider = { - val providerClass = ReflectiveAccess.getClassFor(ProviderClass) match { - case Left(e) ⇒ throw e - case Right(b) ⇒ b - } - val arguments = Seq( - classOf[Settings] -> settings, - classOf[EventStream] -> eventStream, - classOf[Scheduler] -> scheduler) - val types: Array[Class[_]] = arguments map (_._1) toArray - val values: Array[AnyRef] = arguments map (_._2) toArray - - ReflectiveAccess.createInstance[ActorRefProvider](providerClass, types, values) match { - case Left(e: InvocationTargetException) ⇒ throw e.getTargetException - case Left(e) ⇒ throw e - case Right(p) ⇒ p - } - } - val deadLetters = new DeadLetterActorRef(eventStream) val deadLetterMailbox = new Mailbox(null) { becomeClosed() @@ -333,6 +337,35 @@ class ActorSystemImpl(val name: String, val applicationConfig: Config) extends A override def numberOfMessages = 0 } + val provider: ActorRefProvider = { + val providerClass = ReflectiveAccess.getClassFor(ProviderClass) match { + case Left(e) ⇒ throw e + case Right(b) ⇒ b + } + val arguments = Seq( + classOf[String] -> name, + classOf[Settings] -> settings, + classOf[EventStream] -> eventStream, + classOf[Scheduler] -> scheduler, + classOf[ActorRef] -> deadLetters) + val types: Array[Class[_]] = arguments map (_._1) toArray + val values: Array[AnyRef] = arguments map (_._2) toArray + + ReflectiveAccess.createInstance[ActorRefProvider](providerClass, types, values) match { + case Left(e: InvocationTargetException) ⇒ throw e.getTargetException + case Left(e) ⇒ throw e + case Right(p) ⇒ p + } + } + + val actorsJanitor = MinimalActorRef(provider.rootPath) { + case Terminated(x) ⇒ actors.remove(x.path.name) + } + + val systemActorsJanitor = MinimalActorRef(provider.rootPath) { + case Terminated(x) ⇒ systemActors.remove(x.path.name) + } + val dispatcherFactory = new Dispatchers(settings, DefaultDispatcherPrerequisites(eventStream, deadLetterMailbox, scheduler)) implicit val dispatcher = dispatcherFactory.defaultGlobalDispatcher diff --git a/akka-actor/src/main/scala/akka/actor/Address.scala b/akka-actor/src/main/scala/akka/actor/Address.scala index c405702f6e..bda9fd6048 100644 --- a/akka-actor/src/main/scala/akka/actor/Address.scala +++ b/akka-actor/src/main/scala/akka/actor/Address.scala @@ -2,6 +2,8 @@ * Copyright (C) 2009-2011 Typesafe Inc. */ package akka.actor +import java.net.URI +import java.net.URISyntaxException /** * The address specifies the physical location under which an Actor can be @@ -18,4 +20,19 @@ abstract class Address { case class LocalAddress(systemName: String) extends Address { def protocol = "akka" def hostPort = systemName +} + +object LocalActorPath { + def unapply(addr: String): Option[(LocalAddress, Iterable[String])] = { + try { + val uri = new URI(addr) + if (uri.getScheme != "akka") return None + if (uri.getUserInfo != null) return None + if (uri.getHost == null) return None + if (uri.getPath == null) return None + Some(LocalAddress(uri.getHost), uri.getPath.split("/").drop(1)) + } catch { + case _: URISyntaxException ⇒ None + } + } } \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index ec1d8dfc4c..3bb7338d8d 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -182,7 +182,7 @@ class Deployer(val settings: ActorSystem.Settings, val eventStream: EventStream, } if (port == 0) raiseRemoteNodeParsingError() - RemoteAddress(new InetSocketAddress(hostname, port)) + RemoteAddress(settings.name, hostname, port) } RemoteScope(remoteAddresses) diff --git a/akka-actor/src/main/scala/akka/event/EventStream.scala b/akka-actor/src/main/scala/akka/event/EventStream.scala index 3906d2cb04..647fe2336c 100644 --- a/akka-actor/src/main/scala/akka/event/EventStream.scala +++ b/akka-actor/src/main/scala/akka/event/EventStream.scala @@ -5,9 +5,11 @@ package akka.event import akka.actor.{ ActorRef, Actor, Props, ActorSystemImpl, Terminated, ActorSystem, simpleName } import akka.util.Subclassification +import java.util.concurrent.atomic.AtomicInteger object EventStream { implicit def fromActorSystem(system: ActorSystem) = system.eventStream + val generation = new AtomicInteger } class A(x: Int = 0) extends Exception("x=" + x) @@ -52,8 +54,12 @@ class EventStream(debug: Boolean = false) extends LoggingBus with SubchannelClas case ref: ActorRef ⇒ watch(ref) case Terminated(ref) ⇒ unsubscribe(ref) } - }), "MainBusReaper") + }), "MainBusReaper-" + EventStream.generation.incrementAndGet()) subscribers foreach (reaper ! _) } + def stop() { + reaper.stop() + } + } \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala index 4acf0e37c7..64a45fc9d7 100644 --- a/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala @@ -6,32 +6,40 @@ package akka.remote import akka.actor._ import akka.AkkaException - import scala.reflect.BeanProperty import java.io.{ PrintWriter, PrintStream } - import java.net.InetSocketAddress +import java.net.URI +import java.net.URISyntaxException +import java.net.InetAddress object RemoteAddress { - def apply(host: String, port: Int): RemoteAddress = apply(new InetSocketAddress(host, port)) - def apply(inetAddress: InetSocketAddress): RemoteAddress = inetAddress match { - case null ⇒ null - case inet ⇒ - val host = inet.getAddress match { - case null ⇒ inet.getHostName //Fall back to given name - case other ⇒ other.getHostAddress - } - val portNo = inet.getPort - RemoteAddress(portNo, host) + def apply(system: String, host: String, port: Int) = { + val ip = InetAddress.getByName(host) + new RemoteAddress(system, host, ip, port) } } -object LocalOnly extends RemoteAddress(0, "local") - -case class RemoteAddress private[akka] (port: Int, hostname: String) extends Address { +case class RemoteAddress(system: String, host: String, ip: InetAddress, port: Int) extends Address { def protocol = "akka" @transient - lazy val hostPort = hostname + ":" + port + lazy val hostPort = system + "@" + host + ":" + port +} + +object RemoteActorPath { + def unapply(addr: String): Option[(RemoteAddress, Iterable[String])] = { + try { + val uri = new URI(addr) + if (uri.getScheme != "akka") return None + if (uri.getUserInfo == null) return None + if (uri.getHost == null) return None + if (uri.getPort == -1) return None + if (uri.getPath == null) return None + Some(RemoteAddress(uri.getUserInfo, uri.getHost, uri.getPort), uri.getPath.split("/").drop(1)) + } catch { + case _: URISyntaxException ⇒ None + } + } } class RemoteException(message: String) extends AkkaException(message) diff --git a/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala b/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala index 3376ad9416..c6e77c3416 100644 --- a/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala +++ b/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala @@ -63,9 +63,9 @@ class NetworkEventStream(system: ActorSystemImpl) { import NetworkEventStream._ // FIXME: check that this supervision is correct - private[akka] val sender = system.provider.actorOf(system, - Props[Channel].copy(dispatcher = system.dispatcherFactory.newPinnedDispatcher("NetworkEventStream")), - system.systemGuardian, "network-event-sender", systemService = true) + private[akka] val sender = + system.systemActorOf(Props[Channel].copy(dispatcher = system.dispatcherFactory.newPinnedDispatcher("NetworkEventStream")), + "network-event-sender") /** * Registers a network event stream listener (asyncronously). diff --git a/akka-remote/src/main/scala/akka/remote/Remote.scala b/akka-remote/src/main/scala/akka/remote/Remote.scala index ecdc5d39a1..1a3bc27b0b 100644 --- a/akka-remote/src/main/scala/akka/remote/Remote.scala +++ b/akka-remote/src/main/scala/akka/remote/Remote.scala @@ -38,7 +38,7 @@ class Remote(val system: ActorSystemImpl, val nodename: String) { private[remote] val remoteExtension = RemoteExtension(system) private[remote] val serializationExtension = SerializationExtension(system) private[remote] val remoteAddress = { - RemoteAddress(remoteExtension.settings.serverSettings.Hostname, remoteExtension.settings.serverSettings.Port) + RemoteAddress(system.name, remoteExtension.settings.serverSettings.Hostname, remoteExtension.settings.serverSettings.Port) } val failureDetector = new AccrualFailureDetector(system) @@ -141,13 +141,17 @@ class RemoteSystemDaemon(remote: Remote) extends Actor { case Right(instance) ⇒ instance.asInstanceOf[() ⇒ Actor] } - val actorPath = ActorPath(systemImpl, message.getActorPath) - val parent = system.actorFor(actorPath.parent) - - if (parent.isDefined) { - systemImpl.provider.actorOf(systemImpl, Props(creator = actorFactory), parent.get, actorPath.name) - } else { - log.error("Parent actor does not exist, ignoring remote system daemon command [{}]", message) + message.getActorPath match { + case RemoteActorPath(addr, elems) if addr == remoteAddress && elems.size > 0 ⇒ + val name = elems.last + system.actorFor(elems.dropRight(1)) match { + case x if x eq system.deadLetters ⇒ + log.error("Parent actor does not exist, ignoring remote system daemon command [{}]", message) + case parent ⇒ + systemImpl.provider.actorOf(systemImpl, Props(creator = actorFactory), parent, name) + } + case _ ⇒ + log.error("remote path does not match path from message [{}]", message) } } else { @@ -251,7 +255,7 @@ class RemoteMessage(input: RemoteMessageProtocol, remote: RemoteSupport, classLo else remote.system.deadLetters - lazy val recipient: ActorRef = remote.system.actorFor(input.getRecipient.getPath).getOrElse(remote.system.deadLetters) + lazy val recipient: ActorRef = remote.system.actorFor(input.getRecipient.getPath) lazy val payload: Either[Throwable, AnyRef] = if (input.hasException) Left(parseException()) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index a34f975d57..4d5e6f7adc 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -30,18 +30,19 @@ import akka.serialization.SerializationExtension * @author Jonas Bonér */ class RemoteActorRefProvider( + val systemName: String, val settings: ActorSystem.Settings, val eventStream: EventStream, - val scheduler: Scheduler) extends ActorRefProvider { + val scheduler: Scheduler, + _deadLetters: ActorRef) extends ActorRefProvider { val log = Logging(eventStream, "RemoteActorRefProvider") def deathWatch = local.deathWatch def guardian = local.guardian def systemGuardian = local.systemGuardian - def nodename = local.nodename - def clustername = local.clustername - def tempName = local.tempName + def nodename = remoteExtension.settings.NodeName + def clustername = remoteExtension.settings.ClusterName private val actors = new ConcurrentHashMap[String, AnyRef] @@ -57,11 +58,10 @@ class RemoteActorRefProvider( private lazy val remoteExtension = RemoteExtension(system) private lazy val serializationExtension = SerializationExtension(system) lazy val rootPath: ActorPath = { - val remoteAddress = RemoteAddress(remoteExtension.settings.serverSettings.Hostname, remoteExtension.settings.serverSettings.Port) + val remoteAddress = RemoteAddress(system.name, remoteExtension.settings.serverSettings.Hostname, remoteExtension.settings.serverSettings.Port) new RootActorPath(remoteAddress) } - private lazy val local = new LocalActorRefProvider(settings, eventStream, scheduler, rootPath, - remoteExtension.settings.NodeName, remoteExtension.settings.ClusterName) + private lazy val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, _deadLetters) private[akka] lazy val remote = new Remote(system, nodename) private lazy val remoteDaemonConnectionManager = new RemoteConnectionManager(system, remote) @@ -79,13 +79,10 @@ class RemoteActorRefProvider( def dispatcher = local.dispatcher def defaultTimeout = settings.ActorTimeout - private[akka] def actorOf(system: ActorSystemImpl, props: Props, supervisor: ActorRef, name: String, systemService: Boolean): ActorRef = - actorOf(system, props, supervisor, supervisor.path / name, systemService) - - private[akka] def actorOf(system: ActorSystemImpl, props: Props, supervisor: ActorRef, path: ActorPath, systemService: Boolean): ActorRef = - if (systemService) local.actorOf(system, props, supervisor, path, systemService) + def actorOf(system: ActorSystemImpl, props: Props, supervisor: ActorRef, name: String, systemService: Boolean): ActorRef = + if (systemService) local.actorOf(system, props, supervisor, name, systemService) else { - val name = path.name + val path = supervisor.path / name val newFuture = Promise[ActorRef](5000)(dispatcher) // FIXME is this proper timeout? actors.putIfAbsent(path.toString, newFuture) match { // we won the race -- create the actor and resolve the future @@ -144,7 +141,7 @@ class RemoteActorRefProvider( } val connections = (Map.empty[RemoteAddress, ActorRef] /: remoteAddresses) { (conns, a) ⇒ - val remoteAddress = RemoteAddress(a.hostname, a.port) + val remoteAddress = RemoteAddress(system.name, a.host, a.port) conns + (remoteAddress -> RemoteActorRef(remote.system.provider, remote.server, remoteAddress, path, None)) } @@ -182,11 +179,9 @@ class RemoteActorRefProvider( new RoutedActorRef(system, props, supervisor, name) } - def actorFor(path: Iterable[String]): Option[ActorRef] = actors.get(ActorPath.join(path)) match { - case null ⇒ local.actorFor(path) - case actor: ActorRef ⇒ Some(actor) - case future: Future[_] ⇒ Some(future.get.asInstanceOf[ActorRef]) - } + def actorFor(path: ActorPath): ActorRef = local.actorFor(path) + def actorFor(path: String): ActorRef = local.actorFor(path) + def actorFor(path: Iterable[String]): ActorRef = local.actorFor(path) // TODO remove me val optimizeLocal = new AtomicBoolean(true) @@ -195,7 +190,7 @@ class RemoteActorRefProvider( /** * Returns true if the actor was in the provider's cache and evicted successfully, else false. */ - private[akka] def evict(path: String): Boolean = actors.remove(path) ne null + private[akka] def evict(path: ActorPath): Boolean = actors.remove(path) ne null private[akka] def serialize(actor: ActorRef): SerializedActorRef = actor match { case r: RemoteActorRef ⇒ new SerializedActorRef(r.remoteAddress, actor.path.toString) @@ -203,12 +198,12 @@ class RemoteActorRefProvider( } private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] = { - val remoteAddress = RemoteAddress(actor.hostname, actor.port) + val remoteAddress = RemoteAddress(systemName, actor.hostname, actor.port) if (optimizeLocalScoped_? && remoteAddress == remote.remoteAddress) { - local.actorFor(ActorPath.split(actor.path)) + Some(local.actorFor(actor.path)) } else { log.debug("{}: Creating RemoteActorRef with address [{}] connected to [{}]", remote.remoteAddress, actor.path, remoteAddress) - Some(RemoteActorRef(remote.system.provider, remote.server, remoteAddress, rootPath / ActorPath.split(actor.path), None)) //Should it be None here + Some(RemoteActorRef(remote.system.provider, remote.server, remoteAddress, rootPath / actor.path, None)) // FIXME I know, this is broken } } diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 497976cccf..43b9cd45b6 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -152,7 +152,7 @@ class ActiveRemoteClient private[akka] ( val handshake = RemoteControlProtocol.newBuilder.setCommandType(CommandType.CONNECT) if (SecureCookie.nonEmpty) handshake.setCookie(SecureCookie.get) handshake.setOrigin(RemoteProtocol.AddressProtocol.newBuilder - .setHostname(senderRemoteAddress.hostname) + .setHostname(senderRemoteAddress.host) .setPort(senderRemoteAddress.port) .build) connection.getChannel.write(remoteSupport.createControlEnvelope(handshake.build)) @@ -166,7 +166,7 @@ class ActiveRemoteClient private[akka] ( def attemptReconnect(): Boolean = { log.debug("Remote client reconnecting to [{}]", remoteAddress) - val connection = bootstrap.connect(new InetSocketAddress(remoteAddress.hostname, remoteAddress.port)) + val connection = bootstrap.connect(new InetSocketAddress(remoteAddress.ip, remoteAddress.port)) openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails. if (!connection.isSuccess) { @@ -189,7 +189,7 @@ class ActiveRemoteClient private[akka] ( log.debug("Starting remote client connection to [{}]", remoteAddress) - connection = bootstrap.connect(new InetSocketAddress(remoteAddress.hostname, remoteAddress.port)) + connection = bootstrap.connect(new InetSocketAddress(remoteAddress.ip, remoteAddress.port)) val channel = connection.awaitUninterruptibly.getChannel openChannels.add(channel) @@ -512,7 +512,7 @@ class NettyRemoteServer(val remoteSupport: NettyRemoteSupport, val loader: Optio bootstrap.setOption("child.reuseAddress", true) bootstrap.setOption("child.connectTimeoutMillis", ConnectionTimeout.toMillis) - openChannels.add(bootstrap.bind(new InetSocketAddress(address.hostname, address.port))) + openChannels.add(bootstrap.bind(new InetSocketAddress(address.ip, address.port))) remoteSupport.notifyListeners(RemoteServerStarted(remoteSupport)) def shutdown() { @@ -645,7 +645,8 @@ class RemoteServerHandler( instruction.getCommandType match { case CommandType.CONNECT if UsePassiveConnections ⇒ val origin = instruction.getOrigin - val inbound = RemoteAddress(origin.getHostname, origin.getPort) + // FIXME need to include system-name in remote protocol + val inbound = RemoteAddress("BORKED", origin.getHostname, origin.getPort) val client = new PassiveRemoteClient(event.getChannel, remoteSupport, inbound) remoteSupport.bindClient(inbound, client) case CommandType.SHUTDOWN ⇒ //TODO FIXME Dispose passive connection here @@ -664,7 +665,7 @@ class RemoteServerHandler( private def getClientAddress(c: Channel): Option[RemoteAddress] = c.getRemoteAddress match { - case inet: InetSocketAddress ⇒ Some(RemoteAddress(inet)) + case inet: InetSocketAddress ⇒ Some(RemoteAddress("BORKED", inet.getHostName, inet.getPort)) // FIXME Broken! case _ ⇒ None } } diff --git a/akka-remote/src/test/scala/akka/remote/AccrualFailureDetectorSpec.scala b/akka-remote/src/test/scala/akka/remote/AccrualFailureDetectorSpec.scala index 38d18ac6c5..94e2c0272c 100644 --- a/akka-remote/src/test/scala/akka/remote/AccrualFailureDetectorSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/AccrualFailureDetectorSpec.scala @@ -6,7 +6,7 @@ import akka.testkit.AkkaSpec class AccrualFailureDetectorSpec extends AkkaSpec { "An AccrualFailureDetector" must { - val conn = RemoteAddress(new InetSocketAddress("localhost", 2552)) + val conn = RemoteAddress("tester", "localhost", 2552) "mark node as available after a series of successful heartbeats" in { val fd = new AccrualFailureDetector