From 5f918e55bc7f6e26585fbf72b3e10289f5d92304 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Fri, 8 Apr 2011 15:29:14 +0200 Subject: [PATCH] commit in the middle of address refactoring --- .../akka/actor/supervisor/Ticket669Spec.scala | 4 +- .../scala/akka/dataflow/DataFlowSpec.scala | 4 +- .../scala/akka/misc/ActorRegistrySpec.scala | 165 ++---- .../test/scala/akka/misc/SchedulerSpec.scala | 6 +- .../src/main/scala/akka/actor/Actor.scala | 31 +- .../src/main/scala/akka/actor/ActorRef.scala | 22 +- .../main/scala/akka/actor/ActorRegistry.scala | 326 ++++++------ .../actor/BootableActorLoaderService.scala | 2 +- .../main/scala/akka/actor/UntypedActor.scala | 2 +- .../scala/akka/dispatch/MessageHandling.scala | 2 +- .../remoteinterface/RemoteEventHandler.scala | 2 +- .../remoteinterface/RemoteInterface.scala | 8 +- .../scala/akka/util/ReflectiveAccess.scala | 86 ++-- akka-http/src/main/scala/akka/http/Mist.scala | 4 +- .../main/scala/akka/security/Security.scala | 8 +- .../akka/remote/protocol/RemoteProtocol.java | 470 +++++++----------- .../src/main/protocol/RemoteProtocol.proto | 30 +- .../remote/BootableRemoteActorService.scala | 2 +- .../remote/netty/NettyRemoteSupport.scala | 38 +- .../serialization/SerializationProtocol.scala | 9 +- .../test/scala/remote/AkkaRemoteTest.scala | 2 +- .../RemoteErrorHandlingNetworkTest.scala | 2 +- .../scala/remote/RemoteSupervisorSpec.scala | 2 +- .../scala/remote/RemoteTypedActorSpec.scala | 2 +- .../ServerInitiatedRemoteActorSpec.scala | 10 +- .../ServerInitiatedRemoteTypedActorSpec.scala | 4 +- .../main/scala/akka/actor/TypedActor.scala | 20 +- .../actor/typed-actor/Issue675Spec.scala | 2 +- .../typed-actor/TypedActorRegistrySpec.scala | 56 --- .../actor/typed-actor/TypedActorSpec.scala | 65 +-- 30 files changed, 588 insertions(+), 798 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala b/akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala index 54c8179152..c750be9631 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala @@ -14,7 +14,7 @@ import org.scalatest.matchers.MustMatchers class Ticket669Spec extends WordSpec with MustMatchers with BeforeAndAfterAll { import Ticket669Spec._ - override def afterAll = Actor.registry.shutdownAll + override def afterAll = Actor.registry.local.shutdownAll "A supervised actor with lifecycle PERMANENT" should { "be able to reply on failure during preRestart" in { @@ -69,4 +69,4 @@ object Ticket669Spec { self.reply_?("failure2") } } -} \ No newline at end of file +} diff --git a/akka-actor-tests/src/test/scala/akka/dataflow/DataFlowSpec.scala b/akka-actor-tests/src/test/scala/akka/dataflow/DataFlowSpec.scala index f5a107f511..4e3bebf663 100644 --- a/akka-actor-tests/src/test/scala/akka/dataflow/DataFlowSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dataflow/DataFlowSpec.scala @@ -72,7 +72,7 @@ class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll { /* it("should be able to join streams") { import DataFlow._ - Actor.registry.shutdownAll + Actor.registry.local.shutdownAll def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) { stream <<< n @@ -139,7 +139,7 @@ class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll { /* it("should be able to conditionally set variables") { import DataFlow._ - Actor.registry.shutdownAll + Actor.registry.local.shutdownAll val latch = new CountDownLatch(1) val x, y, z, v = new DataFlowVariable[Int] diff --git a/akka-actor-tests/src/test/scala/akka/misc/ActorRegistrySpec.scala b/akka-actor-tests/src/test/scala/akka/misc/ActorRegistrySpec.scala index 09a23dbc5c..4030b953db 100644 --- a/akka-actor-tests/src/test/scala/akka/misc/ActorRegistrySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/misc/ActorRegistrySpec.scala @@ -32,68 +32,45 @@ object ActorRegistrySpec { class ActorRegistrySpec extends JUnitSuite { import ActorRegistrySpec._ - @Test def shouldGetActorByIdFromActorRegistry { - Actor.registry.shutdownAll - val actor = actorOf[TestActor] - actor.start - val actors = Actor.registry.actorsFor("MyID") - assert(actors.size === 1) - assert(actors.head.actor.isInstanceOf[TestActor]) - assert(actors.head.id === "MyID") - actor.stop + @Test def shouldGetActorByAddressFromActorRegistry { + Actor.registry.local.shutdownAll + val actor1 = actorOf[TestActor] + actor1.start + val actor2 = Actor.registry.actorFor(actor1.address) + assert(actor2.isDefined) + assert(actor2.get.address === actor1.address) + actor2.get.stop } - @Test def shouldGetActorByUUIDFromActorRegistry { - Actor.registry.shutdownAll + @Test def shouldGetActorByUUIDFromLocalActorRegistry { + Actor.registry.local.shutdownAll val actor = actorOf[TestActor] val uuid = actor.uuid actor.start - val actorOrNone = Actor.registry.actorFor(uuid) + val actorOrNone = Actor.registry.local.actorFor(uuid) assert(actorOrNone.isDefined) assert(actorOrNone.get.uuid === uuid) actor.stop } - @Test def shouldGetActorByClassFromActorRegistry { - Actor.registry.shutdownAll + @Test def shouldFindThingsFromLocalActorRegistry { + Actor.registry.local.shutdownAll val actor = actorOf[TestActor] actor.start - val actors = Actor.registry.actorsFor(classOf[TestActor]) - assert(actors.size === 1) - assert(actors.head.actor.isInstanceOf[TestActor]) - assert(actors.head.id === "MyID") - actor.stop - } - - @Test def shouldGetActorByManifestFromActorRegistry { - Actor.registry.shutdownAll - val actor = actorOf[TestActor] - actor.start - val actors = Actor.registry.actorsFor[TestActor] - assert(actors.size === 1) - assert(actors.head.actor.isInstanceOf[TestActor]) - assert(actors.head.id === "MyID") - actor.stop - } - - @Test def shouldFindThingsFromActorRegistry { - Actor.registry.shutdownAll - val actor = actorOf[TestActor] - actor.start - val found = Actor.registry.find({ case a: ActorRef if a.actor.isInstanceOf[TestActor] => a }) + val found = Actor.registry.local.find({ case a: ActorRef if a.actor.isInstanceOf[TestActor] => a }) assert(found.isDefined) assert(found.get.actor.isInstanceOf[TestActor]) assert(found.get.id === "MyID") actor.stop } - @Test def shouldGetActorsByIdFromActorRegistry { - Actor.registry.shutdownAll + @Test def shouldGetAllActorsFromLocalActorRegistry { + Actor.registry.local.shutdownAll val actor1 = actorOf[TestActor] actor1.start val actor2 = actorOf[TestActor] actor2.start - val actors = Actor.registry.actorsFor("MyID") + val actors = Actor.registry.local.actors assert(actors.size === 2) assert(actors.head.actor.isInstanceOf[TestActor]) assert(actors.head.id === "MyID") @@ -103,120 +80,45 @@ class ActorRegistrySpec extends JUnitSuite { actor2.stop } - @Test def shouldGetActorsByClassFromActorRegistry { - Actor.registry.shutdownAll - val actor1 = actorOf[TestActor] - actor1.start - val actor2 = actorOf[TestActor] - actor2.start - val actors = Actor.registry.actorsFor(classOf[TestActor]) - assert(actors.size === 2) - assert(actors.head.actor.isInstanceOf[TestActor]) - assert(actors.head.id === "MyID") - assert(actors.last.actor.isInstanceOf[TestActor]) - assert(actors.last.id === "MyID") - actor1.stop - actor2.stop - } - - @Test def shouldGetActorsByManifestFromActorRegistry { - Actor.registry.shutdownAll - val actor1 = actorOf[TestActor] - actor1.start - val actor2 = actorOf[TestActor] - actor2.start - val actors = Actor.registry.actorsFor[TestActor] - assert(actors.size === 2) - assert(actors.head.actor.isInstanceOf[TestActor]) - assert(actors.head.id === "MyID") - assert(actors.last.actor.isInstanceOf[TestActor]) - assert(actors.last.id === "MyID") - actor1.stop - actor2.stop - } - - @Test def shouldGetActorsByMessageFromActorRegistry { - - Actor.registry.shutdownAll - val actor1 = actorOf[TestActor] - actor1.start - val actor2 = actorOf[TestActor2] - actor2.start - - val actorsForAcotrTestActor = Actor.registry.actorsFor[TestActor] - assert(actorsForAcotrTestActor.size === 1) - - val actorsForAcotrTestActor2 = Actor.registry.actorsFor[TestActor2] - assert(actorsForAcotrTestActor2.size === 1) - - val actorsForAcotr = Actor.registry.actorsFor[Actor] - assert(actorsForAcotr.size === 2) - - - val actorsForMessagePing2 = Actor.registry.actorsFor[Actor]("ping2") - assert(actorsForMessagePing2.size === 1) - - val actorsForMessagePing = Actor.registry.actorsFor[Actor]("ping") - assert(actorsForMessagePing.size === 2) - - actor1.stop - actor2.stop - } - - @Test def shouldGetAllActorsFromActorRegistry { - Actor.registry.shutdownAll - val actor1 = actorOf[TestActor] - actor1.start - val actor2 = actorOf[TestActor] - actor2.start - val actors = Actor.registry.actors - assert(actors.size === 2) - assert(actors.head.actor.isInstanceOf[TestActor]) - assert(actors.head.id === "MyID") - assert(actors.last.actor.isInstanceOf[TestActor]) - assert(actors.last.id === "MyID") - actor1.stop - actor2.stop - } - - @Test def shouldGetResponseByAllActorsInActorRegistryWhenInvokingForeach { - Actor.registry.shutdownAll + @Test def shouldGetResponseByAllActorsInLocalActorRegistryWhenInvokingForeach { + Actor.registry.local.shutdownAll val actor1 = actorOf[TestActor] actor1.start val actor2 = actorOf[TestActor] actor2.start record = "" - Actor.registry.foreach(actor => actor !! "ping") + Actor.registry.local.foreach(actor => actor !! "ping") assert(record === "pongpong") actor1.stop actor2.stop } - @Test def shouldShutdownAllActorsInActorRegistry { - Actor.registry.shutdownAll + @Test def shouldShutdownAllActorsInLocalActorRegistry { + Actor.registry.local.shutdownAll val actor1 = actorOf[TestActor] actor1.start val actor2 = actorOf[TestActor] actor2.start - Actor.registry.shutdownAll - assert(Actor.registry.actors.size === 0) + Actor.registry.local.shutdownAll + assert(Actor.registry.local.actors.size === 0) } - @Test def shouldRemoveUnregisterActorInActorRegistry { - Actor.registry.shutdownAll + @Test def shouldRemoveUnregisterActorInLocalActorRegistry { + Actor.registry.local.shutdownAll val actor1 = actorOf[TestActor] actor1.start val actor2 = actorOf[TestActor] actor2.start - assert(Actor.registry.actors.size === 2) + assert(Actor.registry.local.actors.size === 2) Actor.registry.unregister(actor1) - assert(Actor.registry.actors.size === 1) + assert(Actor.registry.local.actors.size === 1) Actor.registry.unregister(actor2) - assert(Actor.registry.actors.size === 0) + assert(Actor.registry.local.actors.size === 0) } + /* @Test def shouldBeAbleToRegisterActorsConcurrently { - Actor.registry.shutdownAll + Actor.registry.local.shutdownAll def mkTestActors = for(i <- (1 to 10).toList;j <- 1 to 3000) yield actorOf( new Actor { self.id = i.toString @@ -244,9 +146,10 @@ class ActorRegistrySpec extends JUnitSuite { for(i <- 1 to 10) { val theId = i.toString - val actors = Actor.registry.actorsFor(theId).toSet - for(a <- actors if a.id == theId) assert(actors contains a) + val actor = Actor.registry.local.actorFor(theId) + assert(actor eq a) assert(actors.size === 9000) } } + */ } diff --git a/akka-actor-tests/src/test/scala/akka/misc/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/misc/SchedulerSpec.scala index 79b09d49d1..77b84e130a 100644 --- a/akka-actor-tests/src/test/scala/akka/misc/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/misc/SchedulerSpec.scala @@ -12,7 +12,7 @@ class SchedulerSpec extends JUnitSuite { def withCleanEndState(action: => Unit) { action Scheduler.restart - Actor.registry.shutdownAll + Actor.registry.local.shutdownAll } @@ -62,10 +62,10 @@ class SchedulerSpec extends JUnitSuite { val actor = actorOf(new Actor { def receive = { case Ping => ticks.countDown } }).start - val numActors = Actor.registry.actors.length + val numActors = Actor.registry.local.actors.length (1 to 1000).foreach( _ => Scheduler.scheduleOnce(actor,Ping,1,TimeUnit.MILLISECONDS) ) assert(ticks.await(10,TimeUnit.SECONDS)) - assert(Actor.registry.actors.length === numActors) + assert(Actor.registry.local.actors.length === numActors) } /** diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 718c3b94bb..00a280188a 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -14,6 +14,7 @@ import scala.reflect.BeanProperty import akka.util. {ReflectiveAccess, Duration} import akka.remoteinterface.RemoteSupport import akka.japi. {Creator, Procedure} +import com.eaio.uuid.UUID /** * Life-cycle messages for the Actors @@ -110,7 +111,7 @@ object Actor extends ListenerManagement { lazy val remote: RemoteSupport = { ReflectiveAccess - .Remote + .RemoteModule .defaultRemoteSupport .map(_()) .getOrElse(throw new UnsupportedOperationException("You need to have akka-remote.jar on classpath")) @@ -141,7 +142,11 @@ object Actor extends ListenerManagement { * val actor = actorOf[MyActor].start * */ - def actorOf[T <: Actor : Manifest]: ActorRef = actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]]) + def actorOf[T <: Actor : Manifest](address: String): ActorRef = + actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], address) + + def actorOf[T <: Actor : Manifest]: ActorRef = + actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], (new UUID).toString) /** * Creates an ActorRef out of the Actor of the specified Class. @@ -157,7 +162,7 @@ object Actor extends ListenerManagement { * val actor = actorOf(classOf[MyActor]).start * */ - def actorOf(clazz: Class[_ <: Actor]): ActorRef = new LocalActorRef(() => { + def actorOf(clazz: Class[_ <: Actor], address: String): ActorRef = new LocalActorRef(() => { import ReflectiveAccess.{ createInstance, noParams, noArgs } createInstance[Actor](clazz.asInstanceOf[Class[_]], noParams, noArgs).getOrElse( throw new ActorInitializationException( @@ -165,7 +170,9 @@ object Actor extends ListenerManagement { "\nMake sure Actor is NOT defined inside a class/trait," + "\nif so put it outside the class/trait, f.e. in a companion object," + "\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.")) - }, None) + }, None, false, address) + + def actorOf(clazz: Class[_ <: Actor]): ActorRef = actorOf(clazz, (new UUID).toString) /** * Creates an ActorRef out of the Actor. Allows you to pass in a factory function @@ -185,7 +192,9 @@ object Actor extends ListenerManagement { * val actor = actorOf(new MyActor).start * */ - def actorOf(factory: => Actor): ActorRef = new LocalActorRef(() => factory, None) + def actorOf(factory: => Actor, address: String): ActorRef = new LocalActorRef(() => factory, None, false, address) + + def actorOf(factory: => Actor): ActorRef = actorOf(factory, (new UUID).toString) /** * Creates an ActorRef out of the Actor. Allows you to pass in a factory (Creator) @@ -195,7 +204,17 @@ object Actor extends ListenerManagement { * This function should NOT be used for remote actors. * JAVA API */ - def actorOf(creator: Creator[Actor]): ActorRef = new LocalActorRef(() => creator.create, None) + def actorOf(creator: Creator[Actor], address: String): ActorRef = new LocalActorRef(() => creator.create, None, false, address) + + /** + * Creates an ActorRef out of the Actor. Allows you to pass in a factory (Creator) + * that creates the Actor. Please note that this function can be invoked multiple + * times if for example the Actor is supervised and needs to be restarted. + *

+ * This function should NOT be used for remote actors. + * JAVA API + */ + def actorOf(creator: Creator[Actor]): ActorRef = actorOf(creator, (new UUID).toString) /** * Use to spawn out a block of code in an event-driven actor. Will shut actor down when diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index cb28a07407..f17f0dfbe9 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -106,6 +106,13 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal @volatile var id: String = _uuid.toString + /** + * FIXME Document + */ + @BeanProperty + @volatile + var address: String = _uuid.toString // FIXME set 'address' in ActorRef and make 'val' + /** * User overridable callback/setting. *

@@ -596,8 +603,10 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal class LocalActorRef private[akka] ( private[this] val actorFactory: () => Actor, val homeAddress: Option[InetSocketAddress], - val clientManaged: Boolean = false) + val clientManaged: Boolean, + _address: String) extends ActorRef with ScalaActorRef { + this.address = _address @volatile private[akka] lazy val _linkedActors = new ConcurrentHashMap[Uuid, ActorRef] @@ -627,8 +636,9 @@ class LocalActorRef private[akka] ( __supervisor: Option[ActorRef], __hotswap: Stack[PartialFunction[Any, Unit]], __factory: () => Actor, - __homeAddress: Option[InetSocketAddress]) = { - this(__factory, __homeAddress) + __homeAddress: Option[InetSocketAddress], + __address: String) = { + this(__factory, __homeAddress, false, __address) _uuid = __uuid id = __id timeout = __timeout @@ -1207,6 +1217,8 @@ private[akka] case class RemoteActorRef private[akka] ( * //superclass, which ActorRefShared is. */ trait ActorRefShared { + def address: String + /** * Returns the uuid for the actor. */ @@ -1232,6 +1244,10 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef => def id_=(id: String): Unit + def address: String + + def address_=(address: String): Unit + /** * User overridable callback/setting. *

diff --git a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala index 03bef32d25..2bdfe08dcc 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala @@ -4,15 +4,15 @@ package akka.actor -import scala.collection.mutable.{ListBuffer, Map} +import scala.collection.mutable.ListBuffer import scala.reflect.Manifest +import annotation.tailrec import java.util.concurrent.{ConcurrentSkipListSet, ConcurrentHashMap} import java.util.{Set => JSet} -import annotation.tailrec import akka.util.ReflectiveAccess._ -import akka.util. {ReflectiveAccess, ReadWriteGuard, ListenerManagement} +import akka.util.{ReflectiveAccess, ReadWriteGuard, ListenerManagement} /** * Base trait for ActorRegistry events, allows listen to when an actor is added and removed from the ActorRegistry. @@ -20,43 +20,180 @@ import akka.util. {ReflectiveAccess, ReadWriteGuard, ListenerManagement} * @author Jonas Bonér */ sealed trait ActorRegistryEvent -case class ActorRegistered(actor: ActorRef) extends ActorRegistryEvent -case class ActorUnregistered(actor: ActorRef) extends ActorRegistryEvent +case class ActorRegistered(address: String, actor: ActorRef) extends ActorRegistryEvent +case class ActorUnregistered(address: String, actor: ActorRef) extends ActorRegistryEvent /** * Registry holding all Actor instances in the whole system. - * Mapped by: - *

    - *
  • the Actor's UUID
  • - *
  • the Actor's id field (which can be set by user-code)
  • - *
  • the Actor's class
  • - *
  • all Actors that are subtypes of a specific type
  • - *
      + * Mapped by address which is a unique string. * * @author Jonas Bonér */ private[actor] final class ActorRegistry private[actor] () extends ListenerManagement { - private val actorsByUUID = new ConcurrentHashMap[Uuid, ActorRef] - private val actorsById = new Index[String,ActorRef] - private val guard = new ReadWriteGuard + //private val isClusterEnabled = ReflectiveAccess.isClusterEnabled + private val actorsByAddress = new ConcurrentHashMap[String, ActorRef] + private val actorsByUuid = new ConcurrentHashMap[String, ActorRef] + private val guard = new ReadWriteGuard + + val local = new LocalActorRegistry(actorsByAddress, actorsByUuid) + + /** + * Finds the actor that has a specific address. + */ + def actorFor(address: String): Option[ActorRef] = { + if (actorsByAddress.containsKey(address)) Some(actorsByAddress.get(address)) +// else if (isClusterEnabled) ClusterModule.node.use(address) + else None + } + + /** + * Finds the typed actors that have a specific address. + */ + def typedActorFor(address: String): Option[AnyRef] = { + TypedActorModule.ensureEnabled + actorFor(address) map (typedActorFor(_)) + } + + /** + * Registers an actor in the ActorRegistry. + */ + private[akka] def register(actor: ActorRef) { + val address = actor.address + if (actorsByAddress.containsKey(address) || registeredInCluster(address)) + throw new IllegalStateException("Actor 'address' [" + address + "] is already in use, can't register actor [" + actor + "]") + actorsByAddress.put(address, actor) + actorsByUuid.put(actor.uuid.toString, actor) + //if (isClusterEnabled) registerInCluster(address, actor) + notifyListeners(ActorRegistered(address, actor)) + } + + /** + * Unregisters an actor in the ActorRegistry. + */ + private[akka] def unregister(address: String) { + val actor = actorsByAddress remove address + actorsByUuid remove actor.uuid + //if (isClusterEnabled) unregisterInCluster(address) + notifyListeners(ActorUnregistered(address, actor)) + } + + /** + * Unregisters an actor in the ActorRegistry. + */ + private[akka] def unregister(actor: ActorRef) { + val address = actor.address + actorsByAddress remove address + actorsByUuid remove actor.uuid + //if (isClusterEnabled) unregisterInCluster(address) + notifyListeners(ActorUnregistered(address, actor)) + } + + /** + * Registers an actor in the Cluster ActorRegistry. + */ + private[akka] def registeredInCluster(address: String): Boolean = { + false // FIXME call out to cluster + } + + /** + * Registers an actor in the Cluster ActorRegistry. + */ + private[akka] def registerInCluster(address: String, actor: ActorRef) { + ClusterModule.node.store(address, actor) + } + + /** + * Unregisters an actor in the Cluster ActorRegistry. + */ + private[akka] def unregisterInCluster(address: String) { + ClusterModule.node.remove(address) + } + + /** + * Get the typed actor proxy for a given typed actor ref. + */ + private def typedActorFor(actorRef: ActorRef): Option[AnyRef] = { + TypedActorModule.typedActorObjectInstance.get.proxyFor(actorRef) + } +} + +/** + * View over the local actor registry. + */ +class LocalActorRegistry( + private val actorsByAddress: ConcurrentHashMap[String, ActorRef], + private val actorsByUuid: ConcurrentHashMap[String, ActorRef]) { + + /** + * Returns the number of actors in the system. + */ + def size: Int = actorsByAddress.size + + /** + * Shuts down and unregisters all actors in the system. + */ + def shutdownAll() { + if (TypedActorModule.isEnabled) { + val elements = actorsByAddress.elements + while (elements.hasMoreElements) { + val actorRef = elements.nextElement + val proxy = typedActorFor(actorRef) + if (proxy.isDefined) TypedActorModule.typedActorObjectInstance.get.stop(proxy.get) + else actorRef.stop + } + } else foreach(_.stop) + if (RemoteModule.isEnabled) Actor.remote.clear //TODO: REVISIT: Should this be here? + actorsByAddress.clear + actorsByUuid.clear + } + + //============== ACTORS ============== + + /** + * Finds the actor that have a specific address. + */ + def actorFor(address: String): Option[ActorRef] = { + if (actorsByAddress.containsKey(address)) Some(actorsByAddress.get(address)) + else None + } + + /** + * Finds the actor that have a specific uuid. + */ + private[akka] def actorFor(uuid: Uuid): Option[ActorRef] = { + val uuidAsString = uuid.toString + if (actorsByUuid.containsKey(uuidAsString)) Some(actorsByUuid.get(uuidAsString)) + else None + } + + /** + * Finds the typed actor that have a specific address. + */ + def typedActorFor(address: String): Option[AnyRef] = { + TypedActorModule.ensureEnabled + actorFor(address) map (typedActorFor(_)) getOrElse None + } + + /** + * Finds the typed actor that have a specific uuid. + */ + private[akka] def typedActorFor(uuid: Uuid): Option[AnyRef] = { + TypedActorModule.ensureEnabled + actorFor(uuid) map (typedActorFor(_)) getOrElse None + } /** * Returns all actors in the system. */ def actors: Array[ActorRef] = filter(_ => true) - /** - * Returns the number of actors in the system. - */ - def size: Int = actorsByUUID.size - /** * Invokes a function for all actors. */ def foreach(f: (ActorRef) => Unit) = { - val elements = actorsByUUID.elements + val elements = actorsByAddress.elements while (elements.hasMoreElements) f(elements.nextElement) } @@ -64,8 +201,8 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag * Invokes the function on all known actors until it returns Some * Returns None if the function never returns Some */ - def find[T](f: PartialFunction[ActorRef,T]) : Option[T] = { - val elements = actorsByUUID.elements + def find[T](f: PartialFunction[ActorRef, T]) : Option[T] = { + val elements = actorsByAddress.elements while (elements.hasMoreElements) { val element = elements.nextElement if (f isDefinedAt element) return Some(f(element)) @@ -73,18 +210,12 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag None } - /** - * Finds all actors that are subtypes of the class passed in as the Manifest argument and supporting passed message. - */ - def actorsFor[T <: Actor](message: Any)(implicit manifest: Manifest[T] ): Array[ActorRef] = - filter(a => manifest.erasure.isAssignableFrom(a.actor.getClass) && a.isDefinedAt(message)) - /** * Finds all actors that satisfy a predicate. */ def filter(p: ActorRef => Boolean): Array[ActorRef] = { val all = new ListBuffer[ActorRef] - val elements = actorsByUUID.elements + val elements = actorsByAddress.elements while (elements.hasMoreElements) { val actorId = elements.nextElement if (p(actorId)) all += actorId @@ -92,33 +223,7 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag all.toArray } - /** - * Finds all actors that are subtypes of the class passed in as the Manifest argument. - */ - def actorsFor[T <: Actor](implicit manifest: Manifest[T]): Array[ActorRef] = - actorsFor[T](manifest.erasure.asInstanceOf[Class[T]]) - - /** - * Finds any actor that matches T. Very expensive, traverses ALL alive actors. - */ - def actorFor[T <: Actor](implicit manifest: Manifest[T]): Option[ActorRef] = - find({ case a: ActorRef if manifest.erasure.isAssignableFrom(a.actor.getClass) => a }) - - /** - * Finds all actors of type or sub-type specified by the class passed in as the Class argument. - */ - def actorsFor[T <: Actor](clazz: Class[T]): Array[ActorRef] = - filter(a => clazz.isAssignableFrom(a.actor.getClass)) - - /** - * Finds all actors that has a specific id. - */ - def actorsFor(id: String): Array[ActorRef] = actorsById values id - - /** - * Finds the actor that has a specific UUID. - */ - def actorFor(uuid: Uuid): Option[ActorRef] = Option(actorsByUUID get uuid) + //============== TYPED ACTORS ============== /** * Returns all typed actors in the system. @@ -130,7 +235,7 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag */ def foreachTypedActor(f: (AnyRef) => Unit) = { TypedActorModule.ensureEnabled - val elements = actorsByUUID.elements + val elements = actorsByAddress.elements while (elements.hasMoreElements) { val proxy = typedActorFor(elements.nextElement) if (proxy.isDefined) f(proxy.get) @@ -143,7 +248,7 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag */ def findTypedActor[T](f: PartialFunction[AnyRef,T]) : Option[T] = { TypedActorModule.ensureEnabled - val elements = actorsByUUID.elements + val elements = actorsByAddress.elements while (elements.hasMoreElements) { val proxy = typedActorFor(elements.nextElement) if (proxy.isDefined && (f isDefinedAt proxy)) return Some(f(proxy)) @@ -157,7 +262,7 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag def filterTypedActors(p: AnyRef => Boolean): Array[AnyRef] = { TypedActorModule.ensureEnabled val all = new ListBuffer[AnyRef] - val elements = actorsByUUID.elements + val elements = actorsByAddress.elements while (elements.hasMoreElements) { val proxy = typedActorFor(elements.nextElement) if (proxy.isDefined && p(proxy.get)) all += proxy.get @@ -165,114 +270,17 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag all.toArray } - /** - * Finds all typed actors that are subtypes of the class passed in as the Manifest argument. - */ - def typedActorsFor[T <: AnyRef](implicit manifest: Manifest[T]): Array[AnyRef] = { - TypedActorModule.ensureEnabled - typedActorsFor[T](manifest.erasure.asInstanceOf[Class[T]]) - } - - /** - * Finds any typed actor that matches T. - */ - def typedActorFor[T <: AnyRef](implicit manifest: Manifest[T]): Option[AnyRef] = { - TypedActorModule.ensureEnabled - def predicate(proxy: AnyRef): Boolean = { - val actorRef = TypedActorModule.typedActorObjectInstance.get.actorFor(proxy) - actorRef.isDefined && manifest.erasure.isAssignableFrom(actorRef.get.actor.getClass) - } - findTypedActor({ case a: Some[AnyRef] if predicate(a.get) => a }) - } - - /** - * Finds all typed actors of type or sub-type specified by the class passed in as the Class argument. - */ - def typedActorsFor[T <: AnyRef](clazz: Class[T]): Array[AnyRef] = { - TypedActorModule.ensureEnabled - def predicate(proxy: AnyRef): Boolean = { - val actorRef = TypedActorModule.typedActorObjectInstance.get.actorFor(proxy) - actorRef.isDefined && clazz.isAssignableFrom(actorRef.get.actor.getClass) - } - filterTypedActors(predicate) - } - - /** - * Finds all typed actors that have a specific id. - */ - def typedActorsFor(id: String): Array[AnyRef] = { - TypedActorModule.ensureEnabled - val actorRefs = actorsById values id - actorRefs.flatMap(typedActorFor(_)) - } - - /** - * Finds the typed actor that has a specific UUID. - */ - def typedActorFor(uuid: Uuid): Option[AnyRef] = { - TypedActorModule.ensureEnabled - val actorRef = actorsByUUID get uuid - if (actorRef eq null) None - else typedActorFor(actorRef) - } - /** * Get the typed actor proxy for a given typed actor ref. */ private def typedActorFor(actorRef: ActorRef): Option[AnyRef] = { TypedActorModule.typedActorObjectInstance.get.proxyFor(actorRef) } - - /** - * Registers an actor in the ActorRegistry. - */ - private[akka] def register(actor: ActorRef) { - val id = actor.id - val uuid = actor.uuid - - actorsById.put(id, actor) - actorsByUUID.put(uuid, actor) - - // notify listeners - notifyListeners(ActorRegistered(actor)) - } - - /** - * Unregisters an actor in the ActorRegistry. - */ - private[akka] def unregister(actor: ActorRef) { - val id = actor.id - val uuid = actor.uuid - - actorsByUUID remove uuid - actorsById.remove(id,actor) - - // notify listeners - notifyListeners(ActorUnregistered(actor)) - } - - /** - * Shuts down and unregisters all actors in the system. - */ - def shutdownAll() { - if (TypedActorModule.isEnabled) { - val elements = actorsByUUID.elements - while (elements.hasMoreElements) { - val actorRef = elements.nextElement - val proxy = typedActorFor(actorRef) - if (proxy.isDefined) TypedActorModule.typedActorObjectInstance.get.stop(proxy.get) - else actorRef.stop - } - } else foreach(_.stop) - if (Remote.isEnabled) { - Actor.remote.clear //TODO: REVISIT: Should this be here? - } - actorsByUUID.clear - actorsById.clear - } } /** + * FIXME move Index to its own file and put in akka.util. + * * An implementation of a ConcurrentMultiMap * Adds/remove is serialized over the specified key * Reads are fully concurrent <-- el-cheapo diff --git a/akka-actor/src/main/scala/akka/actor/BootableActorLoaderService.scala b/akka-actor/src/main/scala/akka/actor/BootableActorLoaderService.scala index 4b96f9ab5d..92b29e8714 100644 --- a/akka-actor/src/main/scala/akka/actor/BootableActorLoaderService.scala +++ b/akka-actor/src/main/scala/akka/actor/BootableActorLoaderService.scala @@ -56,6 +56,6 @@ trait BootableActorLoaderService extends Bootable { abstract override def onUnload = { super.onUnload - Actor.registry.shutdownAll + Actor.registry.local.shutdownAll } } diff --git a/akka-actor/src/main/scala/akka/actor/UntypedActor.scala b/akka-actor/src/main/scala/akka/actor/UntypedActor.scala index a8cea099f6..c0f7677e4a 100644 --- a/akka-actor/src/main/scala/akka/actor/UntypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/UntypedActor.scala @@ -37,7 +37,7 @@ import akka.japi.{Creator, Procedure} * * } else if (msg.equals("ForwardMessage")) { * // Retreive an actor from the ActorRegistry by ID and get an ActorRef back - * ActorRef actorRef = Actor.registry.actorsFor("some-actor-id").head(); + * ActorRef actorRef = Actor.registry.local.actorsFor("some-actor-id").head(); * * } else throw new IllegalArgumentException("Unknown message: " + message); * } else throw new IllegalArgumentException("Unknown message: " + message); diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala index a4387fadf1..7c4da6d1a5 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala @@ -142,7 +142,7 @@ trait MessageDispatcher { val i = uuids.iterator while (i.hasNext()) { val uuid = i.next() - Actor.registry.actorFor(uuid) match { + Actor.registry.local.actorFor(uuid) match { case Some(actor) => actor.stop case None => {} } diff --git a/akka-actor/src/main/scala/akka/remoteinterface/RemoteEventHandler.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteEventHandler.scala index c3ad4d9c79..6534634e08 100644 --- a/akka-actor/src/main/scala/akka/remoteinterface/RemoteEventHandler.scala +++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteEventHandler.scala @@ -8,7 +8,7 @@ import akka.actor.Actor import akka.event.EventHandler /** - * Remote client and server event listener that pipes the events to the standard Akka EventHander. + * RemoteModule client and server event listener that pipes the events to the standard Akka EventHander. * * @author Jonas Bonér */ diff --git a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala index 29d8185ef2..210db70514 100644 --- a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala @@ -271,16 +271,16 @@ trait RemoteServerModule extends RemoteModule { * Starts the server up */ def start(): RemoteServerModule = - start(ReflectiveAccess.Remote.configDefaultAddress.getAddress.getHostAddress, - ReflectiveAccess.Remote.configDefaultAddress.getPort, + start(ReflectiveAccess.RemoteModule.configDefaultAddress.getAddress.getHostAddress, + ReflectiveAccess.RemoteModule.configDefaultAddress.getPort, None) /** * Starts the server up */ def start(loader: ClassLoader): RemoteServerModule = - start(ReflectiveAccess.Remote.configDefaultAddress.getAddress.getHostAddress, - ReflectiveAccess.Remote.configDefaultAddress.getPort, + start(ReflectiveAccess.RemoteModule.configDefaultAddress.getAddress.getHostAddress, + ReflectiveAccess.RemoteModule.configDefaultAddress.getPort, Option(loader)) /** diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala index f4ceba6ebe..942c81bb25 100644 --- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala @@ -21,18 +21,71 @@ object ReflectiveAccess { val loader = getClass.getClassLoader - def isRemotingEnabled = Remote.isEnabled + lazy val isRemotingEnabled = RemoteModule.isEnabled lazy val isTypedActorEnabled = TypedActorModule.isEnabled + lazy val isClusterEnabled = ClusterModule.isEnabled - def ensureRemotingEnabled = Remote.ensureEnabled - def ensureTypedActorEnabled = TypedActorModule.ensureEnabled + def ensureClusterEnabled = ClusterModule.ensureEnabled + def ensureRemotingEnabled = RemoteModule.ensureEnabled + def ensureTypedActorEnabled = TypedActorModule.ensureEnabled + + /** + * Reflective access to the Cluster module. + * + * @author Jonas Bonér + */ + object ClusterModule { + lazy val isEnabled = clusterObjectInstance.isDefined + + def ensureEnabled = if (!isEnabled) { + val e = new ModuleNotAvailableException( + "Can't load the cluster module, make sure that akka-cluster.jar is on the classpath") + EventHandler.debug(this, e.toString) + throw e + } + + lazy val clusterObjectInstance: Option[Cluster] = getObjectFor("akka.cloud.cluster.Cluster$") + + lazy val serializerClass: Option[Class[_]] = getClassFor("akka.serialization.Serializer") + + lazy val node: ClusterNode = { + ensureEnabled + clusterObjectInstance.get.newNode() + } + + type ClusterNode = { + def nrOfActors: Int + def store[T <: Actor](address: String, actorRef: ActorRef): Unit +// (implicit format: Format[T]) + def remove(address: String): Unit + def use(address: String): Option[ActorRef] + } + + type Cluster = { + def newNode( + //nodeAddress: NodeAddress, + //zkServerAddresses: String, + //serializer: ZkSerializer + ): ClusterNode + } + + type Mailbox = { + def enqueue(message: MessageInvocation) + def dequeue: MessageInvocation + } + + type Serializer = { + def toBinary(obj: AnyRef): Array[Byte] + def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef + } + } /** * Reflective access to the RemoteClient module. * * @author Jonas Bonér */ - object Remote { + object RemoteModule { val TRANSPORT = Config.config.getString("akka.remote.layer", "akka.remote.netty.NettyRemoteSupport") private[akka] val configDefaultAddress = @@ -47,6 +100,7 @@ object ReflectiveAccess { EventHandler.debug(this, e.toString) throw e } + val remoteSupportClass: Option[Class[_ <: RemoteSupport]] = getClassFor(TRANSPORT) protected[akka] val defaultRemoteSupport: Option[() => RemoteSupport] = @@ -96,30 +150,6 @@ object ReflectiveAccess { } } - object AkkaCloudModule { - - type Mailbox = { - def enqueue(message: MessageInvocation) - def dequeue: MessageInvocation - } - - type Serializer = { - def toBinary(obj: AnyRef): Array[Byte] - def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef - } - - lazy val isEnabled = clusterObjectInstance.isDefined - - val clusterObjectInstance: Option[AnyRef] = - getObjectFor("akka.cloud.cluster.Cluster$") - - val serializerClass: Option[Class[_]] = - getClassFor("akka.serialization.Serializer") - - def ensureEnabled = if (!isEnabled) throw new ModuleNotAvailableException( - "Feature is only available in Akka Cloud") - } - val noParams = Array[Class[_]]() val noArgs = Array[AnyRef]() diff --git a/akka-http/src/main/scala/akka/http/Mist.scala b/akka-http/src/main/scala/akka/http/Mist.scala index 379cbfb36d..159b7eec68 100644 --- a/akka-http/src/main/scala/akka/http/Mist.scala +++ b/akka-http/src/main/scala/akka/http/Mist.scala @@ -6,6 +6,7 @@ package akka.http import akka.actor.{ActorRef, Actor} import akka.event.EventHandler +import akka.config.ConfigurationException import javax.servlet.http.{HttpServletResponse, HttpServletRequest} import javax.servlet.http.HttpServlet @@ -70,7 +71,8 @@ trait Mist { /** * The root endpoint actor */ - protected val _root = Actor.registry.actorsFor(RootActorID).head + protected val _root = Actor.registry.actorFor(RootActorID).getOrElse( + throw new ConfigurationException("akka.http.root-actor-id configuration option does not have a valid actor address [" + RootActorID + "]")) /** * Server-specific method factory diff --git a/akka-http/src/main/scala/akka/security/Security.scala b/akka-http/src/main/scala/akka/security/Security.scala index dce249de46..6c6577bcae 100644 --- a/akka-http/src/main/scala/akka/security/Security.scala +++ b/akka-http/src/main/scala/akka/security/Security.scala @@ -22,10 +22,10 @@ package akka.security -import akka.actor.{Scheduler, Actor, ActorRef, ActorRegistry, IllegalActorStateException} +import akka.actor.{Scheduler, Actor, ActorRef, IllegalActorStateException} import akka.event.EventHandler import akka.actor.Actor._ -import akka.config.Config +import akka.config.{Config, ConfigurationException} import com.sun.jersey.api.model.AbstractMethod import com.sun.jersey.spi.container.{ResourceFilterFactory, ContainerRequest, ContainerRequestFilter, ContainerResponse, ContainerResponseFilter, ResourceFilter} @@ -109,7 +109,9 @@ class AkkaSecurityFilterFactory extends ResourceFilterFactory { * Currently we always take the first, since there usually should be at most one authentication actor, but a round-robin * strategy could be implemented in the future */ - def authenticator: ActorRef = Actor.registry.actorsFor(authenticatorFQN).head + def authenticator: ActorRef = Actor.registry.actorFor(authenticatorFQN) + .getOrElse(throw new ConfigurationException( + "akka.http.authenticator configuration option does not have a valid actor address [" + authenticatorFQN + "]")) def mkFilter(roles: Option[List[String]]): java.util.List[ResourceFilter] = java.util.Collections.singletonList(new Filter(authenticator, roles)) diff --git a/akka-remote/src/main/java/akka/remote/protocol/RemoteProtocol.java b/akka-remote/src/main/java/akka/remote/protocol/RemoteProtocol.java index 58e8badbfe..3bdbfa377b 100644 --- a/akka-remote/src/main/java/akka/remote/protocol/RemoteProtocol.java +++ b/akka-remote/src/main/java/akka/remote/protocol/RemoteProtocol.java @@ -1841,12 +1841,12 @@ public final class RemoteProtocol { return akka.remote.protocol.RemoteProtocol.internal_static_RemoteActorRefProtocol_fieldAccessorTable; } - // required string classOrServiceName = 1; - public static final int CLASSORSERVICENAME_FIELD_NUMBER = 1; - private boolean hasClassOrServiceName; - private java.lang.String classOrServiceName_ = ""; - public boolean hasClassOrServiceName() { return hasClassOrServiceName; } - public java.lang.String getClassOrServiceName() { return classOrServiceName_; } + // required string address = 1; + public static final int ADDRESS_FIELD_NUMBER = 1; + private boolean hasAddress; + private java.lang.String address_ = ""; + public boolean hasAddress() { return hasAddress; } + public java.lang.String getAddress() { return address_; } // required string actorClassname = 2; public static final int ACTORCLASSNAME_FIELD_NUMBER = 2; @@ -1855,45 +1855,32 @@ public final class RemoteProtocol { public boolean hasActorClassname() { return hasActorClassname; } public java.lang.String getActorClassname() { return actorClassname_; } - // required .AddressProtocol homeAddress = 3; - public static final int HOMEADDRESS_FIELD_NUMBER = 3; - private boolean hasHomeAddress; - private akka.remote.protocol.RemoteProtocol.AddressProtocol homeAddress_; - public boolean hasHomeAddress() { return hasHomeAddress; } - public akka.remote.protocol.RemoteProtocol.AddressProtocol getHomeAddress() { return homeAddress_; } - - // optional uint64 timeout = 4; - public static final int TIMEOUT_FIELD_NUMBER = 4; + // optional uint64 timeout = 3; + public static final int TIMEOUT_FIELD_NUMBER = 3; private boolean hasTimeout; private long timeout_ = 0L; public boolean hasTimeout() { return hasTimeout; } public long getTimeout() { return timeout_; } private void initFields() { - homeAddress_ = akka.remote.protocol.RemoteProtocol.AddressProtocol.getDefaultInstance(); } public final boolean isInitialized() { - if (!hasClassOrServiceName) return false; + if (!hasAddress) return false; if (!hasActorClassname) return false; - if (!hasHomeAddress) return false; - if (!getHomeAddress().isInitialized()) return false; return true; } public void writeTo(com.google.protobuf.CodedOutputStream output) throws java.io.IOException { getSerializedSize(); - if (hasClassOrServiceName()) { - output.writeString(1, getClassOrServiceName()); + if (hasAddress()) { + output.writeString(1, getAddress()); } if (hasActorClassname()) { output.writeString(2, getActorClassname()); } - if (hasHomeAddress()) { - output.writeMessage(3, getHomeAddress()); - } if (hasTimeout()) { - output.writeUInt64(4, getTimeout()); + output.writeUInt64(3, getTimeout()); } getUnknownFields().writeTo(output); } @@ -1904,21 +1891,17 @@ public final class RemoteProtocol { if (size != -1) return size; size = 0; - if (hasClassOrServiceName()) { + if (hasAddress()) { size += com.google.protobuf.CodedOutputStream - .computeStringSize(1, getClassOrServiceName()); + .computeStringSize(1, getAddress()); } if (hasActorClassname()) { size += com.google.protobuf.CodedOutputStream .computeStringSize(2, getActorClassname()); } - if (hasHomeAddress()) { - size += com.google.protobuf.CodedOutputStream - .computeMessageSize(3, getHomeAddress()); - } if (hasTimeout()) { size += com.google.protobuf.CodedOutputStream - .computeUInt64Size(4, getTimeout()); + .computeUInt64Size(3, getTimeout()); } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; @@ -2078,15 +2061,12 @@ public final class RemoteProtocol { public Builder mergeFrom(akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol other) { if (other == akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.getDefaultInstance()) return this; - if (other.hasClassOrServiceName()) { - setClassOrServiceName(other.getClassOrServiceName()); + if (other.hasAddress()) { + setAddress(other.getAddress()); } if (other.hasActorClassname()) { setActorClassname(other.getActorClassname()); } - if (other.hasHomeAddress()) { - mergeHomeAddress(other.getHomeAddress()); - } if (other.hasTimeout()) { setTimeout(other.getTimeout()); } @@ -2116,23 +2096,14 @@ public final class RemoteProtocol { break; } case 10: { - setClassOrServiceName(input.readString()); + setAddress(input.readString()); break; } case 18: { setActorClassname(input.readString()); break; } - case 26: { - akka.remote.protocol.RemoteProtocol.AddressProtocol.Builder subBuilder = akka.remote.protocol.RemoteProtocol.AddressProtocol.newBuilder(); - if (hasHomeAddress()) { - subBuilder.mergeFrom(getHomeAddress()); - } - input.readMessage(subBuilder, extensionRegistry); - setHomeAddress(subBuilder.buildPartial()); - break; - } - case 32: { + case 24: { setTimeout(input.readUInt64()); break; } @@ -2141,24 +2112,24 @@ public final class RemoteProtocol { } - // required string classOrServiceName = 1; - public boolean hasClassOrServiceName() { - return result.hasClassOrServiceName(); + // required string address = 1; + public boolean hasAddress() { + return result.hasAddress(); } - public java.lang.String getClassOrServiceName() { - return result.getClassOrServiceName(); + public java.lang.String getAddress() { + return result.getAddress(); } - public Builder setClassOrServiceName(java.lang.String value) { + public Builder setAddress(java.lang.String value) { if (value == null) { throw new NullPointerException(); } - result.hasClassOrServiceName = true; - result.classOrServiceName_ = value; + result.hasAddress = true; + result.address_ = value; return this; } - public Builder clearClassOrServiceName() { - result.hasClassOrServiceName = false; - result.classOrServiceName_ = getDefaultInstance().getClassOrServiceName(); + public Builder clearAddress() { + result.hasAddress = false; + result.address_ = getDefaultInstance().getAddress(); return this; } @@ -2183,44 +2154,7 @@ public final class RemoteProtocol { return this; } - // required .AddressProtocol homeAddress = 3; - public boolean hasHomeAddress() { - return result.hasHomeAddress(); - } - public akka.remote.protocol.RemoteProtocol.AddressProtocol getHomeAddress() { - return result.getHomeAddress(); - } - public Builder setHomeAddress(akka.remote.protocol.RemoteProtocol.AddressProtocol value) { - if (value == null) { - throw new NullPointerException(); - } - result.hasHomeAddress = true; - result.homeAddress_ = value; - return this; - } - public Builder setHomeAddress(akka.remote.protocol.RemoteProtocol.AddressProtocol.Builder builderForValue) { - result.hasHomeAddress = true; - result.homeAddress_ = builderForValue.build(); - return this; - } - public Builder mergeHomeAddress(akka.remote.protocol.RemoteProtocol.AddressProtocol value) { - if (result.hasHomeAddress() && - result.homeAddress_ != akka.remote.protocol.RemoteProtocol.AddressProtocol.getDefaultInstance()) { - result.homeAddress_ = - akka.remote.protocol.RemoteProtocol.AddressProtocol.newBuilder(result.homeAddress_).mergeFrom(value).buildPartial(); - } else { - result.homeAddress_ = value; - } - result.hasHomeAddress = true; - return this; - } - public Builder clearHomeAddress() { - result.hasHomeAddress = false; - result.homeAddress_ = akka.remote.protocol.RemoteProtocol.AddressProtocol.getDefaultInstance(); - return this; - } - - // optional uint64 timeout = 4; + // optional uint64 timeout = 3; public boolean hasTimeout() { return result.hasTimeout(); } @@ -2638,12 +2572,12 @@ public final class RemoteProtocol { public boolean hasUuid() { return hasUuid; } public akka.remote.protocol.RemoteProtocol.UuidProtocol getUuid() { return uuid_; } - // required string id = 2; - public static final int ID_FIELD_NUMBER = 2; - private boolean hasId; - private java.lang.String id_ = ""; - public boolean hasId() { return hasId; } - public java.lang.String getId() { return id_; } + // required string address = 2; + public static final int ADDRESS_FIELD_NUMBER = 2; + private boolean hasAddress; + private java.lang.String address_ = ""; + public boolean hasAddress() { return hasAddress; } + public java.lang.String getAddress() { return address_; } // required string actorClassname = 3; public static final int ACTORCLASSNAME_FIELD_NUMBER = 3; @@ -2652,64 +2586,57 @@ public final class RemoteProtocol { public boolean hasActorClassname() { return hasActorClassname; } public java.lang.String getActorClassname() { return actorClassname_; } - // required .AddressProtocol originalAddress = 4; - public static final int ORIGINALADDRESS_FIELD_NUMBER = 4; - private boolean hasOriginalAddress; - private akka.remote.protocol.RemoteProtocol.AddressProtocol originalAddress_; - public boolean hasOriginalAddress() { return hasOriginalAddress; } - public akka.remote.protocol.RemoteProtocol.AddressProtocol getOriginalAddress() { return originalAddress_; } - - // optional bytes actorInstance = 5; - public static final int ACTORINSTANCE_FIELD_NUMBER = 5; + // optional bytes actorInstance = 4; + public static final int ACTORINSTANCE_FIELD_NUMBER = 4; private boolean hasActorInstance; private com.google.protobuf.ByteString actorInstance_ = com.google.protobuf.ByteString.EMPTY; public boolean hasActorInstance() { return hasActorInstance; } public com.google.protobuf.ByteString getActorInstance() { return actorInstance_; } - // optional string serializerClassname = 6; - public static final int SERIALIZERCLASSNAME_FIELD_NUMBER = 6; + // optional string serializerClassname = 5; + public static final int SERIALIZERCLASSNAME_FIELD_NUMBER = 5; private boolean hasSerializerClassname; private java.lang.String serializerClassname_ = ""; public boolean hasSerializerClassname() { return hasSerializerClassname; } public java.lang.String getSerializerClassname() { return serializerClassname_; } - // optional uint64 timeout = 7; - public static final int TIMEOUT_FIELD_NUMBER = 7; + // optional uint64 timeout = 6; + public static final int TIMEOUT_FIELD_NUMBER = 6; private boolean hasTimeout; private long timeout_ = 0L; public boolean hasTimeout() { return hasTimeout; } public long getTimeout() { return timeout_; } - // optional uint64 receiveTimeout = 8; - public static final int RECEIVETIMEOUT_FIELD_NUMBER = 8; + // optional uint64 receiveTimeout = 7; + public static final int RECEIVETIMEOUT_FIELD_NUMBER = 7; private boolean hasReceiveTimeout; private long receiveTimeout_ = 0L; public boolean hasReceiveTimeout() { return hasReceiveTimeout; } public long getReceiveTimeout() { return receiveTimeout_; } - // optional .LifeCycleProtocol lifeCycle = 9; - public static final int LIFECYCLE_FIELD_NUMBER = 9; + // optional .LifeCycleProtocol lifeCycle = 8; + public static final int LIFECYCLE_FIELD_NUMBER = 8; private boolean hasLifeCycle; private akka.remote.protocol.RemoteProtocol.LifeCycleProtocol lifeCycle_; public boolean hasLifeCycle() { return hasLifeCycle; } public akka.remote.protocol.RemoteProtocol.LifeCycleProtocol getLifeCycle() { return lifeCycle_; } - // optional .RemoteActorRefProtocol supervisor = 10; - public static final int SUPERVISOR_FIELD_NUMBER = 10; + // optional .RemoteActorRefProtocol supervisor = 9; + public static final int SUPERVISOR_FIELD_NUMBER = 9; private boolean hasSupervisor; private akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol supervisor_; public boolean hasSupervisor() { return hasSupervisor; } public akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol getSupervisor() { return supervisor_; } - // optional bytes hotswapStack = 11; - public static final int HOTSWAPSTACK_FIELD_NUMBER = 11; + // optional bytes hotswapStack = 10; + public static final int HOTSWAPSTACK_FIELD_NUMBER = 10; private boolean hasHotswapStack; private com.google.protobuf.ByteString hotswapStack_ = com.google.protobuf.ByteString.EMPTY; public boolean hasHotswapStack() { return hasHotswapStack; } public com.google.protobuf.ByteString getHotswapStack() { return hotswapStack_; } - // repeated .RemoteMessageProtocol messages = 12; - public static final int MESSAGES_FIELD_NUMBER = 12; + // repeated .RemoteMessageProtocol messages = 11; + public static final int MESSAGES_FIELD_NUMBER = 11; private java.util.List messages_ = java.util.Collections.emptyList(); public java.util.List getMessagesList() { @@ -2722,17 +2649,14 @@ public final class RemoteProtocol { private void initFields() { uuid_ = akka.remote.protocol.RemoteProtocol.UuidProtocol.getDefaultInstance(); - originalAddress_ = akka.remote.protocol.RemoteProtocol.AddressProtocol.getDefaultInstance(); lifeCycle_ = akka.remote.protocol.RemoteProtocol.LifeCycleProtocol.getDefaultInstance(); supervisor_ = akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.getDefaultInstance(); } public final boolean isInitialized() { if (!hasUuid) return false; - if (!hasId) return false; + if (!hasAddress) return false; if (!hasActorClassname) return false; - if (!hasOriginalAddress) return false; if (!getUuid().isInitialized()) return false; - if (!getOriginalAddress().isInitialized()) return false; if (hasLifeCycle()) { if (!getLifeCycle().isInitialized()) return false; } @@ -2751,38 +2675,35 @@ public final class RemoteProtocol { if (hasUuid()) { output.writeMessage(1, getUuid()); } - if (hasId()) { - output.writeString(2, getId()); + if (hasAddress()) { + output.writeString(2, getAddress()); } if (hasActorClassname()) { output.writeString(3, getActorClassname()); } - if (hasOriginalAddress()) { - output.writeMessage(4, getOriginalAddress()); - } if (hasActorInstance()) { - output.writeBytes(5, getActorInstance()); + output.writeBytes(4, getActorInstance()); } if (hasSerializerClassname()) { - output.writeString(6, getSerializerClassname()); + output.writeString(5, getSerializerClassname()); } if (hasTimeout()) { - output.writeUInt64(7, getTimeout()); + output.writeUInt64(6, getTimeout()); } if (hasReceiveTimeout()) { - output.writeUInt64(8, getReceiveTimeout()); + output.writeUInt64(7, getReceiveTimeout()); } if (hasLifeCycle()) { - output.writeMessage(9, getLifeCycle()); + output.writeMessage(8, getLifeCycle()); } if (hasSupervisor()) { - output.writeMessage(10, getSupervisor()); + output.writeMessage(9, getSupervisor()); } if (hasHotswapStack()) { - output.writeBytes(11, getHotswapStack()); + output.writeBytes(10, getHotswapStack()); } for (akka.remote.protocol.RemoteProtocol.RemoteMessageProtocol element : getMessagesList()) { - output.writeMessage(12, element); + output.writeMessage(11, element); } getUnknownFields().writeTo(output); } @@ -2797,49 +2718,45 @@ public final class RemoteProtocol { size += com.google.protobuf.CodedOutputStream .computeMessageSize(1, getUuid()); } - if (hasId()) { + if (hasAddress()) { size += com.google.protobuf.CodedOutputStream - .computeStringSize(2, getId()); + .computeStringSize(2, getAddress()); } if (hasActorClassname()) { size += com.google.protobuf.CodedOutputStream .computeStringSize(3, getActorClassname()); } - if (hasOriginalAddress()) { - size += com.google.protobuf.CodedOutputStream - .computeMessageSize(4, getOriginalAddress()); - } if (hasActorInstance()) { size += com.google.protobuf.CodedOutputStream - .computeBytesSize(5, getActorInstance()); + .computeBytesSize(4, getActorInstance()); } if (hasSerializerClassname()) { size += com.google.protobuf.CodedOutputStream - .computeStringSize(6, getSerializerClassname()); + .computeStringSize(5, getSerializerClassname()); } if (hasTimeout()) { size += com.google.protobuf.CodedOutputStream - .computeUInt64Size(7, getTimeout()); + .computeUInt64Size(6, getTimeout()); } if (hasReceiveTimeout()) { size += com.google.protobuf.CodedOutputStream - .computeUInt64Size(8, getReceiveTimeout()); + .computeUInt64Size(7, getReceiveTimeout()); } if (hasLifeCycle()) { size += com.google.protobuf.CodedOutputStream - .computeMessageSize(9, getLifeCycle()); + .computeMessageSize(8, getLifeCycle()); } if (hasSupervisor()) { size += com.google.protobuf.CodedOutputStream - .computeMessageSize(10, getSupervisor()); + .computeMessageSize(9, getSupervisor()); } if (hasHotswapStack()) { size += com.google.protobuf.CodedOutputStream - .computeBytesSize(11, getHotswapStack()); + .computeBytesSize(10, getHotswapStack()); } for (akka.remote.protocol.RemoteProtocol.RemoteMessageProtocol element : getMessagesList()) { size += com.google.protobuf.CodedOutputStream - .computeMessageSize(12, element); + .computeMessageSize(11, element); } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; @@ -3006,15 +2923,12 @@ public final class RemoteProtocol { if (other.hasUuid()) { mergeUuid(other.getUuid()); } - if (other.hasId()) { - setId(other.getId()); + if (other.hasAddress()) { + setAddress(other.getAddress()); } if (other.hasActorClassname()) { setActorClassname(other.getActorClassname()); } - if (other.hasOriginalAddress()) { - mergeOriginalAddress(other.getOriginalAddress()); - } if (other.hasActorInstance()) { setActorInstance(other.getActorInstance()); } @@ -3077,7 +2991,7 @@ public final class RemoteProtocol { break; } case 18: { - setId(input.readString()); + setAddress(input.readString()); break; } case 26: { @@ -3085,31 +2999,22 @@ public final class RemoteProtocol { break; } case 34: { - akka.remote.protocol.RemoteProtocol.AddressProtocol.Builder subBuilder = akka.remote.protocol.RemoteProtocol.AddressProtocol.newBuilder(); - if (hasOriginalAddress()) { - subBuilder.mergeFrom(getOriginalAddress()); - } - input.readMessage(subBuilder, extensionRegistry); - setOriginalAddress(subBuilder.buildPartial()); - break; - } - case 42: { setActorInstance(input.readBytes()); break; } - case 50: { + case 42: { setSerializerClassname(input.readString()); break; } - case 56: { + case 48: { setTimeout(input.readUInt64()); break; } - case 64: { + case 56: { setReceiveTimeout(input.readUInt64()); break; } - case 74: { + case 66: { akka.remote.protocol.RemoteProtocol.LifeCycleProtocol.Builder subBuilder = akka.remote.protocol.RemoteProtocol.LifeCycleProtocol.newBuilder(); if (hasLifeCycle()) { subBuilder.mergeFrom(getLifeCycle()); @@ -3118,7 +3023,7 @@ public final class RemoteProtocol { setLifeCycle(subBuilder.buildPartial()); break; } - case 82: { + case 74: { akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.Builder subBuilder = akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.newBuilder(); if (hasSupervisor()) { subBuilder.mergeFrom(getSupervisor()); @@ -3127,11 +3032,11 @@ public final class RemoteProtocol { setSupervisor(subBuilder.buildPartial()); break; } - case 90: { + case 82: { setHotswapStack(input.readBytes()); break; } - case 98: { + case 90: { akka.remote.protocol.RemoteProtocol.RemoteMessageProtocol.Builder subBuilder = akka.remote.protocol.RemoteProtocol.RemoteMessageProtocol.newBuilder(); input.readMessage(subBuilder, extensionRegistry); addMessages(subBuilder.buildPartial()); @@ -3179,24 +3084,24 @@ public final class RemoteProtocol { return this; } - // required string id = 2; - public boolean hasId() { - return result.hasId(); + // required string address = 2; + public boolean hasAddress() { + return result.hasAddress(); } - public java.lang.String getId() { - return result.getId(); + public java.lang.String getAddress() { + return result.getAddress(); } - public Builder setId(java.lang.String value) { + public Builder setAddress(java.lang.String value) { if (value == null) { throw new NullPointerException(); } - result.hasId = true; - result.id_ = value; + result.hasAddress = true; + result.address_ = value; return this; } - public Builder clearId() { - result.hasId = false; - result.id_ = getDefaultInstance().getId(); + public Builder clearAddress() { + result.hasAddress = false; + result.address_ = getDefaultInstance().getAddress(); return this; } @@ -3221,44 +3126,7 @@ public final class RemoteProtocol { return this; } - // required .AddressProtocol originalAddress = 4; - public boolean hasOriginalAddress() { - return result.hasOriginalAddress(); - } - public akka.remote.protocol.RemoteProtocol.AddressProtocol getOriginalAddress() { - return result.getOriginalAddress(); - } - public Builder setOriginalAddress(akka.remote.protocol.RemoteProtocol.AddressProtocol value) { - if (value == null) { - throw new NullPointerException(); - } - result.hasOriginalAddress = true; - result.originalAddress_ = value; - return this; - } - public Builder setOriginalAddress(akka.remote.protocol.RemoteProtocol.AddressProtocol.Builder builderForValue) { - result.hasOriginalAddress = true; - result.originalAddress_ = builderForValue.build(); - return this; - } - public Builder mergeOriginalAddress(akka.remote.protocol.RemoteProtocol.AddressProtocol value) { - if (result.hasOriginalAddress() && - result.originalAddress_ != akka.remote.protocol.RemoteProtocol.AddressProtocol.getDefaultInstance()) { - result.originalAddress_ = - akka.remote.protocol.RemoteProtocol.AddressProtocol.newBuilder(result.originalAddress_).mergeFrom(value).buildPartial(); - } else { - result.originalAddress_ = value; - } - result.hasOriginalAddress = true; - return this; - } - public Builder clearOriginalAddress() { - result.hasOriginalAddress = false; - result.originalAddress_ = akka.remote.protocol.RemoteProtocol.AddressProtocol.getDefaultInstance(); - return this; - } - - // optional bytes actorInstance = 5; + // optional bytes actorInstance = 4; public boolean hasActorInstance() { return result.hasActorInstance(); } @@ -3279,7 +3147,7 @@ public final class RemoteProtocol { return this; } - // optional string serializerClassname = 6; + // optional string serializerClassname = 5; public boolean hasSerializerClassname() { return result.hasSerializerClassname(); } @@ -3300,7 +3168,7 @@ public final class RemoteProtocol { return this; } - // optional uint64 timeout = 7; + // optional uint64 timeout = 6; public boolean hasTimeout() { return result.hasTimeout(); } @@ -3318,7 +3186,7 @@ public final class RemoteProtocol { return this; } - // optional uint64 receiveTimeout = 8; + // optional uint64 receiveTimeout = 7; public boolean hasReceiveTimeout() { return result.hasReceiveTimeout(); } @@ -3336,7 +3204,7 @@ public final class RemoteProtocol { return this; } - // optional .LifeCycleProtocol lifeCycle = 9; + // optional .LifeCycleProtocol lifeCycle = 8; public boolean hasLifeCycle() { return result.hasLifeCycle(); } @@ -3373,7 +3241,7 @@ public final class RemoteProtocol { return this; } - // optional .RemoteActorRefProtocol supervisor = 10; + // optional .RemoteActorRefProtocol supervisor = 9; public boolean hasSupervisor() { return result.hasSupervisor(); } @@ -3410,7 +3278,7 @@ public final class RemoteProtocol { return this; } - // optional bytes hotswapStack = 11; + // optional bytes hotswapStack = 10; public boolean hasHotswapStack() { return result.hasHotswapStack(); } @@ -3431,7 +3299,7 @@ public final class RemoteProtocol { return this; } - // repeated .RemoteMessageProtocol messages = 12; + // repeated .RemoteMessageProtocol messages = 11; public java.util.List getMessagesList() { return java.util.Collections.unmodifiableList(result.messages_); } @@ -4290,12 +4158,12 @@ public final class RemoteProtocol { public boolean hasTypedActorInfo() { return hasTypedActorInfo; } public akka.remote.protocol.RemoteProtocol.TypedActorInfoProtocol getTypedActorInfo() { return typedActorInfo_; } - // optional string id = 6; - public static final int ID_FIELD_NUMBER = 6; - private boolean hasId; - private java.lang.String id_ = ""; - public boolean hasId() { return hasId; } - public java.lang.String getId() { return id_; } + // optional string address = 6; + public static final int ADDRESS_FIELD_NUMBER = 6; + private boolean hasAddress; + private java.lang.String address_ = ""; + public boolean hasAddress() { return hasAddress; } + public java.lang.String getAddress() { return address_; } private void initFields() { uuid_ = akka.remote.protocol.RemoteProtocol.UuidProtocol.getDefaultInstance(); @@ -4332,8 +4200,8 @@ public final class RemoteProtocol { if (hasTypedActorInfo()) { output.writeMessage(5, getTypedActorInfo()); } - if (hasId()) { - output.writeString(6, getId()); + if (hasAddress()) { + output.writeString(6, getAddress()); } getUnknownFields().writeTo(output); } @@ -4364,9 +4232,9 @@ public final class RemoteProtocol { size += com.google.protobuf.CodedOutputStream .computeMessageSize(5, getTypedActorInfo()); } - if (hasId()) { + if (hasAddress()) { size += com.google.protobuf.CodedOutputStream - .computeStringSize(6, getId()); + .computeStringSize(6, getAddress()); } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; @@ -4541,8 +4409,8 @@ public final class RemoteProtocol { if (other.hasTypedActorInfo()) { mergeTypedActorInfo(other.getTypedActorInfo()); } - if (other.hasId()) { - setId(other.getId()); + if (other.hasAddress()) { + setAddress(other.getAddress()); } this.mergeUnknownFields(other.getUnknownFields()); return this; @@ -4606,7 +4474,7 @@ public final class RemoteProtocol { break; } case 50: { - setId(input.readString()); + setAddress(input.readString()); break; } } @@ -4748,24 +4616,24 @@ public final class RemoteProtocol { return this; } - // optional string id = 6; - public boolean hasId() { - return result.hasId(); + // optional string address = 6; + public boolean hasAddress() { + return result.hasAddress(); } - public java.lang.String getId() { - return result.getId(); + public java.lang.String getAddress() { + return result.getAddress(); } - public Builder setId(java.lang.String value) { + public Builder setAddress(java.lang.String value) { if (value == null) { throw new NullPointerException(); } - result.hasId = true; - result.id_ = value; + result.hasAddress = true; + result.address_ = value; return this; } - public Builder clearId() { - result.hasId = false; - result.id_ = getDefaultInstance().getId(); + public Builder clearAddress() { + result.hasAddress = false; + result.address_ = getDefaultInstance().getAddress(); return this; } @@ -6819,46 +6687,44 @@ public final class RemoteProtocol { "tadata\030\010 \003(\0132\026.MetadataEntryProtocol\022\016\n\006" + "cookie\030\t \001(\t\"J\n\025RemoteControlProtocol\022\016\n" + "\006cookie\030\001 \001(\t\022!\n\013commandType\030\002 \002(\0162\014.Com" + - "mandType\"\204\001\n\026RemoteActorRefProtocol\022\032\n\022c" + - "lassOrServiceName\030\001 \002(\t\022\026\n\016actorClassnam" + - "e\030\002 \002(\t\022%\n\013homeAddress\030\003 \002(\0132\020.AddressPr" + - "otocol\022\017\n\007timeout\030\004 \001(\004\"_\n\033RemoteTypedAc" + - "torRefProtocol\022)\n\010actorRef\030\001 \002(\0132\027.Remot" + - "eActorRefProtocol\022\025\n\rinterfaceName\030\002 \002(\t" + - "\"\371\002\n\032SerializedActorRefProtocol\022\033\n\004uuid\030", - "\001 \002(\0132\r.UuidProtocol\022\n\n\002id\030\002 \002(\t\022\026\n\016acto" + - "rClassname\030\003 \002(\t\022)\n\017originalAddress\030\004 \002(" + - "\0132\020.AddressProtocol\022\025\n\ractorInstance\030\005 \001" + - "(\014\022\033\n\023serializerClassname\030\006 \001(\t\022\017\n\007timeo" + - "ut\030\007 \001(\004\022\026\n\016receiveTimeout\030\010 \001(\004\022%\n\tlife" + - "Cycle\030\t \001(\0132\022.LifeCycleProtocol\022+\n\nsuper" + - "visor\030\n \001(\0132\027.RemoteActorRefProtocol\022\024\n\014" + - "hotswapStack\030\013 \001(\014\022(\n\010messages\030\014 \003(\0132\026.R" + - "emoteMessageProtocol\"g\n\037SerializedTypedA" + - "ctorRefProtocol\022-\n\010actorRef\030\001 \002(\0132\033.Seri", - "alizedActorRefProtocol\022\025\n\rinterfaceName\030" + - "\002 \002(\t\"r\n\017MessageProtocol\0225\n\023serializatio" + - "nScheme\030\001 \002(\0162\030.SerializationSchemeType\022" + - "\017\n\007message\030\002 \002(\014\022\027\n\017messageManifest\030\003 \001(" + - "\014\"\255\001\n\021ActorInfoProtocol\022\033\n\004uuid\030\001 \002(\0132\r." + - "UuidProtocol\022\016\n\006target\030\002 \002(\t\022\017\n\007timeout\030" + - "\003 \002(\004\022\035\n\tactorType\030\004 \002(\0162\n.ActorType\022/\n\016" + - "typedActorInfo\030\005 \001(\0132\027.TypedActorInfoPro" + - "tocol\022\n\n\002id\030\006 \001(\t\";\n\026TypedActorInfoProto" + - "col\022\021\n\tinterface\030\001 \002(\t\022\016\n\006method\030\002 \002(\t\")", - "\n\014UuidProtocol\022\014\n\004high\030\001 \002(\004\022\013\n\003low\030\002 \002(" + - "\004\"3\n\025MetadataEntryProtocol\022\013\n\003key\030\001 \002(\t\022" + - "\r\n\005value\030\002 \002(\014\"6\n\021LifeCycleProtocol\022!\n\tl" + - "ifeCycle\030\001 \002(\0162\016.LifeCycleType\"1\n\017Addres" + - "sProtocol\022\020\n\010hostname\030\001 \002(\t\022\014\n\004port\030\002 \002(" + - "\r\"7\n\021ExceptionProtocol\022\021\n\tclassname\030\001 \002(" + - "\t\022\017\n\007message\030\002 \002(\t*\033\n\013CommandType\022\014\n\010SHU" + - "TDOWN\020\001*=\n\tActorType\022\017\n\013SCALA_ACTOR\020\001\022\016\n" + - "\nJAVA_ACTOR\020\002\022\017\n\013TYPED_ACTOR\020\003*]\n\027Serial" + - "izationSchemeType\022\010\n\004JAVA\020\001\022\013\n\007SBINARY\020\002", - "\022\016\n\nSCALA_JSON\020\003\022\r\n\tJAVA_JSON\020\004\022\014\n\010PROTO" + - "BUF\020\005*-\n\rLifeCycleType\022\r\n\tPERMANENT\020\001\022\r\n" + - "\tTEMPORARY\020\002B\030\n\024akka.remote.protocolH\001" + "mandType\"R\n\026RemoteActorRefProtocol\022\017\n\007ad" + + "dress\030\001 \002(\t\022\026\n\016actorClassname\030\002 \002(\t\022\017\n\007t" + + "imeout\030\003 \001(\004\"_\n\033RemoteTypedActorRefProto" + + "col\022)\n\010actorRef\030\001 \002(\0132\027.RemoteActorRefPr" + + "otocol\022\025\n\rinterfaceName\030\002 \002(\t\"\323\002\n\032Serial" + + "izedActorRefProtocol\022\033\n\004uuid\030\001 \002(\0132\r.Uui" + + "dProtocol\022\017\n\007address\030\002 \002(\t\022\026\n\016actorClass", + "name\030\003 \002(\t\022\025\n\ractorInstance\030\004 \001(\014\022\033\n\023ser" + + "ializerClassname\030\005 \001(\t\022\017\n\007timeout\030\006 \001(\004\022" + + "\026\n\016receiveTimeout\030\007 \001(\004\022%\n\tlifeCycle\030\010 \001" + + "(\0132\022.LifeCycleProtocol\022+\n\nsupervisor\030\t \001" + + "(\0132\027.RemoteActorRefProtocol\022\024\n\014hotswapSt" + + "ack\030\n \001(\014\022(\n\010messages\030\013 \003(\0132\026.RemoteMess" + + "ageProtocol\"g\n\037SerializedTypedActorRefPr" + + "otocol\022-\n\010actorRef\030\001 \002(\0132\033.SerializedAct" + + "orRefProtocol\022\025\n\rinterfaceName\030\002 \002(\t\"r\n\017" + + "MessageProtocol\0225\n\023serializationScheme\030\001", + " \002(\0162\030.SerializationSchemeType\022\017\n\007messag" + + "e\030\002 \002(\014\022\027\n\017messageManifest\030\003 \001(\014\"\262\001\n\021Act" + + "orInfoProtocol\022\033\n\004uuid\030\001 \002(\0132\r.UuidProto" + + "col\022\016\n\006target\030\002 \002(\t\022\017\n\007timeout\030\003 \002(\004\022\035\n\t" + + "actorType\030\004 \002(\0162\n.ActorType\022/\n\016typedActo" + + "rInfo\030\005 \001(\0132\027.TypedActorInfoProtocol\022\017\n\007" + + "address\030\006 \001(\t\";\n\026TypedActorInfoProtocol\022" + + "\021\n\tinterface\030\001 \002(\t\022\016\n\006method\030\002 \002(\t\")\n\014Uu" + + "idProtocol\022\014\n\004high\030\001 \002(\004\022\013\n\003low\030\002 \002(\004\"3\n" + + "\025MetadataEntryProtocol\022\013\n\003key\030\001 \002(\t\022\r\n\005v", + "alue\030\002 \002(\014\"6\n\021LifeCycleProtocol\022!\n\tlifeC" + + "ycle\030\001 \002(\0162\016.LifeCycleType\"1\n\017AddressPro" + + "tocol\022\020\n\010hostname\030\001 \002(\t\022\014\n\004port\030\002 \002(\r\"7\n" + + "\021ExceptionProtocol\022\021\n\tclassname\030\001 \002(\t\022\017\n" + + "\007message\030\002 \002(\t*\033\n\013CommandType\022\014\n\010SHUTDOW" + + "N\020\001*=\n\tActorType\022\017\n\013SCALA_ACTOR\020\001\022\016\n\nJAV" + + "A_ACTOR\020\002\022\017\n\013TYPED_ACTOR\020\003*]\n\027Serializat" + + "ionSchemeType\022\010\n\004JAVA\020\001\022\013\n\007SBINARY\020\002\022\016\n\n" + + "SCALA_JSON\020\003\022\r\n\tJAVA_JSON\020\004\022\014\n\010PROTOBUF\020" + + "\005*-\n\rLifeCycleType\022\r\n\tPERMANENT\020\001\022\r\n\tTEM", + "PORARY\020\002B\030\n\024akka.remote.protocolH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -6894,7 +6760,7 @@ public final class RemoteProtocol { internal_static_RemoteActorRefProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_RemoteActorRefProtocol_descriptor, - new java.lang.String[] { "ClassOrServiceName", "ActorClassname", "HomeAddress", "Timeout", }, + new java.lang.String[] { "Address", "ActorClassname", "Timeout", }, akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.class, akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.Builder.class); internal_static_RemoteTypedActorRefProtocol_descriptor = @@ -6910,7 +6776,7 @@ public final class RemoteProtocol { internal_static_SerializedActorRefProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_SerializedActorRefProtocol_descriptor, - new java.lang.String[] { "Uuid", "Id", "ActorClassname", "OriginalAddress", "ActorInstance", "SerializerClassname", "Timeout", "ReceiveTimeout", "LifeCycle", "Supervisor", "HotswapStack", "Messages", }, + new java.lang.String[] { "Uuid", "Address", "ActorClassname", "ActorInstance", "SerializerClassname", "Timeout", "ReceiveTimeout", "LifeCycle", "Supervisor", "HotswapStack", "Messages", }, akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol.class, akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol.Builder.class); internal_static_SerializedTypedActorRefProtocol_descriptor = @@ -6934,7 +6800,7 @@ public final class RemoteProtocol { internal_static_ActorInfoProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_ActorInfoProtocol_descriptor, - new java.lang.String[] { "Uuid", "Target", "Timeout", "ActorType", "TypedActorInfo", "Id", }, + new java.lang.String[] { "Uuid", "Target", "Timeout", "ActorType", "TypedActorInfo", "Address", }, akka.remote.protocol.RemoteProtocol.ActorInfoProtocol.class, akka.remote.protocol.RemoteProtocol.ActorInfoProtocol.Builder.class); internal_static_TypedActorInfoProtocol_descriptor = diff --git a/akka-remote/src/main/protocol/RemoteProtocol.proto b/akka-remote/src/main/protocol/RemoteProtocol.proto index 209b204767..85148b72cf 100644 --- a/akka-remote/src/main/protocol/RemoteProtocol.proto +++ b/akka-remote/src/main/protocol/RemoteProtocol.proto @@ -51,10 +51,9 @@ enum CommandType { * on the original node. */ message RemoteActorRefProtocol { - required string classOrServiceName = 1; + required string address = 1; required string actorClassname = 2; - required AddressProtocol homeAddress = 3; - optional uint64 timeout = 4; + optional uint64 timeout = 3; } /** @@ -68,22 +67,21 @@ message RemoteTypedActorRefProtocol { /** * Defines a fully serialized remote ActorRef (with serialized Actor instance) - * that is about to be instantiated on the remote node. It is fully disconnected + * that is about to be instantiated on the remote node. It is fully disconnected * from its original host. */ message SerializedActorRefProtocol { required UuidProtocol uuid = 1; - required string id = 2; + required string address = 2; required string actorClassname = 3; - required AddressProtocol originalAddress = 4; - optional bytes actorInstance = 5; - optional string serializerClassname = 6; - optional uint64 timeout = 7; - optional uint64 receiveTimeout = 8; - optional LifeCycleProtocol lifeCycle = 9; - optional RemoteActorRefProtocol supervisor = 10; - optional bytes hotswapStack = 11; - repeated RemoteMessageProtocol messages = 12; + optional bytes actorInstance = 4; + optional string serializerClassname = 5; + optional uint64 timeout = 6; + optional uint64 receiveTimeout = 7; + optional LifeCycleProtocol lifeCycle = 8; + optional RemoteActorRefProtocol supervisor = 9; + optional bytes hotswapStack = 10; + repeated RemoteMessageProtocol messages = 11; } /** @@ -114,7 +112,7 @@ message ActorInfoProtocol { required uint64 timeout = 3; required ActorType actorType = 4; optional TypedActorInfoProtocol typedActorInfo = 5; - optional string id = 6; + optional string address = 6; } /** @@ -200,4 +198,4 @@ message AddressProtocol { message ExceptionProtocol { required string classname = 1; required string message = 2; -} \ No newline at end of file +} diff --git a/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala b/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala index bd586ce939..32855a289b 100644 --- a/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala +++ b/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala @@ -18,7 +18,7 @@ trait BootableRemoteActorService extends Bootable { protected lazy val remoteServerThread = new Thread(new Runnable() { def run = Actor.remote.start(self.applicationLoader.getOrElse(null)) //Use config host/port - }, "Akka Remote Service") + }, "Akka RemoteModule Service") def startRemoteService = remoteServerThread.start diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index d9ded4b0e2..443b28041f 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -287,7 +287,7 @@ abstract class RemoteClient private[akka] ( Some(futureResult) } } else { - val exception = new RemoteClientException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.", module, remoteAddress) + val exception = new RemoteClientException("RemoteModule client is not running, make sure you have invoked 'RemoteClient.connect' before using it.", module, remoteAddress) notifyListeners(RemoteClientError(exception, module, remoteAddress)) throw exception } @@ -597,10 +597,10 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with if (optimizeLocalScoped_?) { val home = this.address if ((host == home.getAddress.getHostAddress || host == home.getHostName) && port == home.getPort)//TODO: switch to InetSocketAddress.equals? - return new LocalActorRef(factory, None) // Code is much simpler with return + return new LocalActorRef(factory, None, false, "todo") // Code is much simpler with return } - val ref = new LocalActorRef(factory, Some(new InetSocketAddress(host, port)), clientManaged = true) + val ref = new LocalActorRef(factory, Some(new InetSocketAddress(host, port)), true, "todo") //ref.timeout = timeout //removed because setting default timeout should be done after construction ref } @@ -657,13 +657,13 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule => def address = currentServer.get match { case s: Some[NettyRemoteServer] => s.get.address - case None => ReflectiveAccess.Remote.configDefaultAddress + case None => ReflectiveAccess.RemoteModule.configDefaultAddress } def name = currentServer.get match { case s: Some[NettyRemoteServer] => s.get.name case None => - val a = ReflectiveAccess.Remote.configDefaultAddress + val a = ReflectiveAccess.RemoteModule.configDefaultAddress "NettyRemoteServer@" + a.getAddress.getHostAddress + ":" + a.getPort } @@ -713,7 +713,7 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule => } /** - * Register Remote Actor by a specific 'id' passed as argument. + * Register RemoteModule Actor by a specific 'id' passed as argument. *

      * NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself. */ @@ -734,7 +734,7 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule => } /** - * Register Remote Session Actor by a specific 'id' passed as argument. + * Register RemoteModule Session Actor by a specific 'id' passed as argument. *

      * NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself. */ @@ -758,7 +758,7 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule => } /** - * Unregister Remote Actor that is registered using its 'id' field (not custom ID). + * Unregister RemoteModule Actor that is registered using its 'id' field (not custom ID). */ def unregister(actorRef: ActorRef): Unit = guard withGuard { if (_isRunning.isOn) { @@ -768,7 +768,7 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule => } /** - * Unregister Remote Actor by specific 'id'. + * Unregister RemoteModule Actor by specific 'id'. *

      * NOTE: You need to call this method if you have registered an actor by a custom ID. */ @@ -784,7 +784,7 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule => } /** - * Unregister Remote Actor by specific 'id'. + * Unregister RemoteModule Actor by specific 'id'. *

      * NOTE: You need to call this method if you have registered an actor by a custom ID. */ @@ -795,7 +795,7 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule => } /** - * Unregister Remote Typed Actor by specific 'id'. + * Unregister RemoteModule Typed Actor by specific 'id'. *

      * NOTE: You need to call this method if you have registered an actor by a custom ID. */ @@ -807,7 +807,7 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule => } /** - * Unregister Remote Typed Actor by specific 'id'. + * Unregister RemoteModule Typed Actor by specific 'id'. *

      * NOTE: You need to call this method if you have registered an actor by a custom ID. */ @@ -972,10 +972,10 @@ class RemoteServerHandler( message match { // first match on system messages case RemoteActorSystemMessage.Stop => - if (UNTRUSTED_MODE) throw new SecurityException("Remote server is operating is untrusted mode, can not stop the actor") + if (UNTRUSTED_MODE) throw new SecurityException("RemoteModule server is operating is untrusted mode, can not stop the actor") else actorRef.stop case _: LifeCycleMessage if (UNTRUSTED_MODE) => - throw new SecurityException("Remote server is operating is untrusted mode, can not pass on a LifeCycleMessage to the remote actor") + throw new SecurityException("RemoteModule server is operating is untrusted mode, can not pass on a LifeCycleMessage to the remote actor") case _ => // then match on user defined messages if (request.getOneWay) actorRef.!(message)(sender) @@ -1139,7 +1139,7 @@ class RemoteServerHandler( try { if (UNTRUSTED_MODE) throw new SecurityException( - "Remote server is operating is untrusted mode, can not create remote actors on behalf of the remote client") + "RemoteModule server is operating is untrusted mode, can not create remote actors on behalf of the remote client") val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name) else Class.forName(name) @@ -1205,7 +1205,7 @@ class RemoteServerHandler( try { if (UNTRUSTED_MODE) throw new SecurityException( - "Remote server is operating is untrusted mode, can not create remote actors on behalf of the remote client") + "RemoteModule server is operating is untrusted mode, can not create remote actors on behalf of the remote client") val (interfaceClass, targetClass) = if (applicationLoader.isDefined) (applicationLoader.get.loadClass(interfaceClassname), @@ -1230,13 +1230,13 @@ class RemoteServerHandler( server.findTypedActorByIdOrUuid(actorInfo.getId, parseUuid(uuid).toString) match { case null => // the actor has not been registered globally. See if we have it in the session createTypedSessionActor(actorInfo, channel) match { - case null => + case null => // FIXME this is broken, if a user tries to get a server-managed typed actor and that is not registered then a client-managed typed actor is created, but just throwing an exception here causes client-managed typed actors to fail - + /* val e = new RemoteServerException("Can't load remote Typed Actor for [" + actorInfo.getId + "]") EventHandler.error(e, this, e.getMessage) server.notifyListeners(RemoteServerError(e, server)) - throw e + throw e */ createClientManagedTypedActor(actorInfo) // client-managed actor case sessionActor => sessionActor } diff --git a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala index 7ad0c1e443..149dca68b6 100644 --- a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala @@ -157,11 +157,11 @@ object ActorSerialization { } private def fromBinaryToLocalActorRef[T <: Actor]( - bytes: Array[Byte], - homeAddress: Option[InetSocketAddress], + bytes: Array[Byte], + homeAddress: Option[InetSocketAddress], format: Format[T]): ActorRef = { val builder = SerializedActorRefProtocol.newBuilder.mergeFrom(bytes) - homeAddress.foreach { addr => + homeAddress.foreach { addr => val addressProtocol = AddressProtocol.newBuilder.setHostname(addr.getAddress.getHostAddress).setPort(addr.getPort).build builder.setOriginalAddress(addressProtocol) } @@ -219,7 +219,8 @@ object ActorSerialization { supervisor, hotswap, factory, - homeAddress) + homeAddress, + "address") // FIXME grab real address and use that val messages = protocol.getMessagesList.toArray.toList.asInstanceOf[List[RemoteMessageProtocol]] messages.foreach(message => ar ! MessageSerializer.deserialize(message.getMessage)) diff --git a/akka-remote/src/test/scala/remote/AkkaRemoteTest.scala b/akka-remote/src/test/scala/remote/AkkaRemoteTest.scala index 4538489191..8aa0a7307e 100644 --- a/akka-remote/src/test/scala/remote/AkkaRemoteTest.scala +++ b/akka-remote/src/test/scala/remote/AkkaRemoteTest.scala @@ -52,7 +52,7 @@ class AkkaRemoteTest extends override def afterEach() { remote.shutdown - Actor.registry.shutdownAll + Actor.registry.local.shutdownAll super.afterEach } diff --git a/akka-remote/src/test/scala/remote/RemoteErrorHandlingNetworkTest.scala b/akka-remote/src/test/scala/remote/RemoteErrorHandlingNetworkTest.scala index dc4559a46b..eee7e5e690 100644 --- a/akka-remote/src/test/scala/remote/RemoteErrorHandlingNetworkTest.scala +++ b/akka-remote/src/test/scala/remote/RemoteErrorHandlingNetworkTest.scala @@ -53,7 +53,7 @@ object RemoteErrorHandlingNetworkTest { class RemoteErrorHandlingNetworkTest extends AkkaRemoteTest with NetworkFailureTest { import RemoteErrorHandlingNetworkTest._ - "Remote actors" should { + "RemoteModule actors" should { "be able to recover from network drop without loosing any messages" in { validateSudo() diff --git a/akka-remote/src/test/scala/remote/RemoteSupervisorSpec.scala b/akka-remote/src/test/scala/remote/RemoteSupervisorSpec.scala index 4026418d18..d75b36a628 100644 --- a/akka-remote/src/test/scala/remote/RemoteSupervisorSpec.scala +++ b/akka-remote/src/test/scala/remote/RemoteSupervisorSpec.scala @@ -77,7 +77,7 @@ class RemoteSupervisorSpec extends AkkaRemoteTest { import Log._ - "Remote supervision" should { + "RemoteModule supervision" should { "start server" in { Log.messageLog.clear diff --git a/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala b/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala index 988236b85b..bd772e65a0 100644 --- a/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala +++ b/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala @@ -57,7 +57,7 @@ class RemoteTypedActorSpec extends AkkaRemoteTest { Thread.sleep(1000) } - "Remote Typed Actor " should { + "RemoteModule Typed Actor " should { /*"receives one-way message" in { val ta = conf.getInstance(classOf[RemoteTypedActorOne]) diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala index 88a5ec8ec3..0f20cc22ac 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala @@ -96,10 +96,10 @@ class ServerInitiatedRemoteActorSpec extends AkkaRemoteTest { implicit val sender = replyHandler(latch, "Pong") remote.register(actorOf[RemoteActorSpecActorUnidirectional]) val actor = remote.actorFor("akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorUnidirectional", host, port) - val numberOfActorsInRegistry = Actor.registry.actors.length + val numberOfActorsInRegistry = Actor.registry.local.actors.length actor ! "Ping" latch.await(1, TimeUnit.SECONDS) must be (true) - numberOfActorsInRegistry must equal (Actor.registry.actors.length) + numberOfActorsInRegistry must equal (Actor.registry.local.actors.length) } "UseServiceNameAsIdForRemoteActorRef" in { @@ -170,6 +170,8 @@ class ServerInitiatedRemoteActorSpec extends AkkaRemoteTest { latch.await(3,TimeUnit.SECONDS) must be (true) } + + /** FIXME rewrite after new registry changes "should be able to remotely communicate between 2 server-managed actors" in { val localFoo = actorOf[Decrementer] val localBar = actorOf[Decrementer] @@ -194,11 +196,11 @@ class ServerInitiatedRemoteActorSpec extends AkkaRemoteTest { latch.countDown } - val decrementers = Actor.registry.actorsFor[Decrementer] - decrementers must have size(2) //No new are allowed to have been created + val decrementer = Actor.registry.local.actorFor[Decrementer] decrementers.find( _ eq localFoo) must equal (Some(localFoo)) decrementers.find( _ eq localBar) must equal (Some(localBar)) } + */ } } diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala index 48520e2b34..05adc49e71 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala @@ -39,10 +39,10 @@ class ServerInitiatedRemoteTypedActorSpec extends AkkaRemoteTest { "should not recreate registered actors" in { val actor = createRemoteActorRef - val numberOfActorsInRegistry = Actor.registry.actors.length + val numberOfActorsInRegistry = Actor.registry.local.actors.length actor.oneWay oneWayLog.poll(5, TimeUnit.SECONDS) must equal ("oneway") - numberOfActorsInRegistry must be (Actor.registry.actors.length) + numberOfActorsInRegistry must be (Actor.registry.local.actors.length) } "should support multiple variants to get the actor from client side" in { diff --git a/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala index 813c26ab94..f74d1598f3 100644 --- a/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala @@ -710,7 +710,7 @@ object TypedActor { * Get the underlying typed actor for the given Typed Actor. */ def actorFor(proxy: AnyRef): Option[ActorRef] = - Actor.registry find { + Actor.registry.local find { case a if a.actor.isInstanceOf[TypedActor] && a.actor.asInstanceOf[TypedActor].proxy == proxy => a } @@ -728,6 +728,12 @@ object TypedActor { /** * Links an other Typed Actor to this Typed Actor. + * + * Example linking another typed actor from within a typed actor: + *

      +   *   TypedActor.link(getContext(), child);
      +   * 
      + * * @param supervisor the supervisor Typed Actor * @param supervised the Typed Actor to link */ @@ -741,6 +747,12 @@ object TypedActor { /** * Links an other Typed Actor to this Typed Actor and sets the fault handling for the supervisor. + * + * Example linking another typed actor from within a typed actor: + *
      +   *   TypedActor.link(getContext(), child, faultHandler);
      +   * 
      + * * @param supervisor the supervisor Typed Actor * @param supervised the Typed Actor to link * @param handler fault handling strategy @@ -758,6 +770,12 @@ object TypedActor { /** * Unlink the supervised Typed Actor from the supervisor. + * + * Example unlinking another typed actor from within a typed actor: + *
      +   *   TypedActor.unlink(getContext(), child);
      +   * 
      + * * @param supervisor the supervisor Typed Actor * @param supervised the Typed Actor to unlink */ diff --git a/akka-typed-actor/src/test/scala/actor/typed-actor/Issue675Spec.scala b/akka-typed-actor/src/test/scala/actor/typed-actor/Issue675Spec.scala index e978b61c45..9255281099 100644 --- a/akka-typed-actor/src/test/scala/actor/typed-actor/Issue675Spec.scala +++ b/akka-typed-actor/src/test/scala/actor/typed-actor/Issue675Spec.scala @@ -38,7 +38,7 @@ class Issue675Spec extends BeforeAndAfterEach { override def afterEach() { - Actor.registry.shutdownAll + Actor.registry.local.shutdownAll } describe("TypedActor preStart method") { diff --git a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorRegistrySpec.scala b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorRegistrySpec.scala index 0a031026ef..e75ad5617a 100644 --- a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorRegistrySpec.scala +++ b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorRegistrySpec.scala @@ -15,61 +15,5 @@ class TypedActorRegistrySpec extends WordSpec with MustMatchers { import TypedActorRegistrySpec._ "Typed Actor" should { - - "be able to be retreived from the registry by class" in { - Actor.registry.shutdownAll - val my = TypedActor.newInstance[My](classOf[My], classOf[MyImpl], 3000) - val actors = Actor.registry.typedActorsFor(classOf[My]) - actors.length must be (1) - Actor.registry.shutdownAll - } - - "be able to be retreived from the registry by manifest" in { - Actor.registry.shutdownAll - val my = TypedActor.newInstance[My](classOf[My], classOf[MyImpl], 3000) - val option = Actor.registry.typedActorFor[My] - option must not be (null) - option.isDefined must be (true) - Actor.registry.shutdownAll - } - - "be able to be retreived from the registry by class two times" in { - Actor.registry.shutdownAll - val my = TypedActor.newInstance[My](classOf[My], classOf[MyImpl], 3000) - val actors1 = Actor.registry.typedActorsFor(classOf[My]) - actors1.length must be (1) - val actors2 = Actor.registry.typedActorsFor(classOf[My]) - actors2.length must be (1) - Actor.registry.shutdownAll - } - - "be able to be retreived from the registry by manifest two times" in { - Actor.registry.shutdownAll - val my = TypedActor.newInstance[My](classOf[My], classOf[MyImpl], 3000) - val option1 = Actor.registry.typedActorFor[My] - option1 must not be (null) - option1.isDefined must be (true) - val option2 = Actor.registry.typedActorFor[My] - option2 must not be (null) - option2.isDefined must be (true) - Actor.registry.shutdownAll - } - - "be able to be retreived from the registry by manifest two times (even when created in supervisor)" in { - Actor.registry.shutdownAll - val manager = new TypedActorConfigurator - manager.configure( - OneForOneStrategy(classOf[Exception] :: Nil, 3, 1000), - Array(new SuperviseTypedActor(classOf[My], classOf[MyImpl], Permanent, 6000)) - ).supervise - - val option1 = Actor.registry.typedActorFor[My] - option1 must not be (null) - option1.isDefined must be (true) - val option2 = Actor.registry.typedActorFor[My] - option2 must not be (null) - option2.isDefined must be (true) - Actor.registry.shutdownAll - } } } diff --git a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorSpec.scala b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorSpec.scala index f871f98841..897e809a96 100644 --- a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorSpec.scala +++ b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorSpec.scala @@ -68,7 +68,7 @@ class TypedActorSpec extends } override def afterEach() { - Actor.registry.shutdownAll + Actor.registry.local.shutdownAll } describe("TypedActor") { @@ -120,67 +120,48 @@ class TypedActorSpec extends it("should support finding a typed actor by uuid ") { val typedActorRef = TypedActor.actorFor(simplePojo).get val uuid = typedActorRef.uuid - assert(Actor.registry.typedActorFor(newUuid()) === None) - assert(Actor.registry.typedActorFor(uuid).isDefined) - assert(Actor.registry.typedActorFor(uuid).get === simplePojo) + assert(Actor.registry.local.typedActorFor(newUuid()) === None) + assert(Actor.registry.local.typedActorFor(uuid).isDefined) + assert(Actor.registry.local.typedActorFor(uuid).get === simplePojo) } - it("should support finding typed actors by id ") { - val typedActors = Actor.registry.typedActorsFor("my-custom-id") - assert(typedActors.length === 1) - assert(typedActors.contains(pojo)) - - // creating untyped actor with same custom id - val actorRef = Actor.actorOf[MyActor].start - val typedActors2 = Actor.registry.typedActorsFor("my-custom-id") - assert(typedActors2.length === 1) - assert(typedActors2.contains(pojo)) - actorRef.stop + it("should support finding a typed actor by address ") { + val typedActorRef = TypedActor.actorFor(simplePojo).get + val address = typedActorRef.address + assert(Actor.registry.local.typedActorFor(newUuid().toString) === None) + assert(Actor.registry.local.typedActorFor(address).isDefined) + assert(Actor.registry.local.typedActorFor(address).get === simplePojo) } it("should support to filter typed actors") { - val actors = Actor.registry.filterTypedActors(ta => ta.isInstanceOf[MyTypedActor]) + val actors = Actor.registry.local.filterTypedActors(ta => ta.isInstanceOf[MyTypedActor]) assert(actors.length === 1) assert(actors.contains(pojo)) } - it("should support to find typed actors by class") { - val actors = Actor.registry.typedActorsFor(classOf[MyTypedActorImpl]) - assert(actors.length === 1) - assert(actors.contains(pojo)) - assert(Actor.registry.typedActorsFor(classOf[MyActor]).isEmpty) - } - it("should support to get all typed actors") { - val actors = Actor.registry.typedActors + val actors = Actor.registry.local.typedActors assert(actors.length === 2) assert(actors.contains(pojo)) assert(actors.contains(simplePojo)) } - it("should support to find typed actors by manifest") { - val actors = Actor.registry.typedActorsFor[MyTypedActorImpl] - assert(actors.length === 1) - assert(actors.contains(pojo)) - assert(Actor.registry.typedActorsFor[MyActor].isEmpty) - } - it("should support foreach for typed actors") { val actorRef = Actor.actorOf[MyActor].start - assert(Actor.registry.actors.size === 3) - assert(Actor.registry.typedActors.size === 2) - Actor.registry.foreachTypedActor(TypedActor.stop(_)) - assert(Actor.registry.actors.size === 1) - assert(Actor.registry.typedActors.size === 0) + assert(Actor.registry.local.actors.size === 3) + assert(Actor.registry.local.typedActors.size === 2) + Actor.registry.local.foreachTypedActor(TypedActor.stop(_)) + assert(Actor.registry.local.actors.size === 1) + assert(Actor.registry.local.typedActors.size === 0) } it("should shutdown all typed and untyped actors") { val actorRef = Actor.actorOf[MyActor].start - assert(Actor.registry.actors.size === 3) - assert(Actor.registry.typedActors.size === 2) - Actor.registry.shutdownAll() - assert(Actor.registry.actors.size === 0) - assert(Actor.registry.typedActors.size === 0) + assert(Actor.registry.local.actors.size === 3) + assert(Actor.registry.local.typedActors.size === 2) + Actor.registry.local.shutdownAll() + assert(Actor.registry.local.actors.size === 0) + assert(Actor.registry.local.typedActors.size === 0) } } -} \ No newline at end of file +}