diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala index 6ef64df1b2..da6115ef5f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala @@ -43,6 +43,10 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout { val syst = system.asInstanceOf[ActorSystemImpl].systemGuardian val root = system.asInstanceOf[ActorSystemImpl].lookupRoot + def empty(path: String) = new EmptyLocalActorRef(system.eventStream, system.dispatcher, path match { + case RelativeActorPath(elems) ⇒ system.actorFor("/").path / elems + }) + "An ActorSystem" must { "find actors by looking up their path" in { @@ -101,14 +105,18 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout { system.actorFor("system/") must be === syst } - "return deadLetters for non-existing paths" in { - system.actorFor("a/b/c") must be === system.deadLetters - system.actorFor("") must be === system.deadLetters - system.actorFor("akka://all-systems/Nobody") must be === system.deadLetters - system.actorFor("akka://all-systems/user") must be === system.deadLetters - system.actorFor(system / "hallo") must be === system.deadLetters - system.actorFor(Seq()) must be === system.deadLetters - system.actorFor(Seq("a")) must be === system.deadLetters + "return deadLetters or EmptyLocalActorRef, respectively, for non-existing paths" in { + def check(lookup: ActorRef, result: ActorRef) = { + lookup.getClass must be === result.getClass + lookup must be === result + } + check(system.actorFor("a/b/c"), empty("a/b/c")) + check(system.actorFor(""), system.deadLetters) + check(system.actorFor("akka://all-systems/Nobody"), system.deadLetters) + check(system.actorFor("akka://all-systems/user"), system.deadLetters) + check(system.actorFor(system / "hallo"), empty("user/hallo")) + check(system.actorFor(Seq()), system.deadLetters) + check(system.actorFor(Seq("a")), empty("a")) } "find temporary actors" in { @@ -119,13 +127,14 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout { system.actorFor(a.path.toString) must be === a system.actorFor(a.path.elements) must be === a system.actorFor(a.path.toString + "/") must be === a - system.actorFor(a.path.toString + "/hallo") must be === system.deadLetters + system.actorFor(a.path.toString + "/hallo").isTerminated must be === true f.isCompleted must be === false + a.isTerminated must be === false a ! 42 f.isCompleted must be === true Await.result(f, timeout.duration) must be === 42 // clean-up is run as onComplete callback, i.e. dispatched on another thread - awaitCond(system.actorFor(a.path) == system.deadLetters, 1 second) + awaitCond(system.actorFor(a.path).isTerminated, 1 second) } } @@ -195,21 +204,27 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout { for (target ← Seq(root, syst, user, system.deadLetters)) check(target) } - "return deadLetters for non-existing paths" in { + "return deadLetters or EmptyLocalActorRef, respectively, for non-existing paths" in { import scala.collection.JavaConverters._ - def checkOne(looker: ActorRef, query: Query) { - Await.result(looker ? query, timeout.duration) must be === system.deadLetters + def checkOne(looker: ActorRef, query: Query, result: ActorRef) { + val lookup = Await.result(looker ? query, timeout.duration) + lookup.getClass must be === result.getClass + lookup must be === result } def check(looker: ActorRef) { - Seq(LookupString("a/b/c"), - LookupString(""), - LookupString("akka://all-systems/Nobody"), - LookupPath(system / "hallo"), - LookupPath(looker.path child "hallo"), // test Java API - LookupPath(looker.path descendant Seq("a", "b").asJava), // test Java API - LookupElems(Seq()), - LookupElems(Seq("a"))) foreach (checkOne(looker, _)) + val lookname = looker.path.elements.mkString("", "/", "/") + for ( + (l, r) ← Seq( + LookupString("a/b/c") -> empty(lookname + "a/b/c"), + LookupString("") -> system.deadLetters, + LookupString("akka://all-systems/Nobody") -> system.deadLetters, + LookupPath(system / "hallo") -> empty("user/hallo"), + LookupPath(looker.path child "hallo") -> empty(lookname + "hallo"), // test Java API + LookupPath(looker.path descendant Seq("a", "b").asJava) -> empty(lookname + "a/b"), // test Java API + LookupElems(Seq()) -> system.deadLetters, + LookupElems(Seq("a")) -> empty(lookname + "a")) + ) checkOne(looker, l, r) } for (looker ← all) check(looker) } @@ -228,11 +243,12 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout { Await.result(c2 ? LookupElems(Seq("..", "..") ++ a.path.elements), timeout.duration) must be === a Await.result(c2 ? LookupElems(Seq("..", "..") ++ a.path.elements :+ ""), timeout.duration) must be === a f.isCompleted must be === false + a.isTerminated must be === false a ! 42 f.isCompleted must be === true Await.result(f, timeout.duration) must be === 42 // clean-up is run as onComplete callback, i.e. dispatched on another thread - awaitCond(Await.result(c2 ? LookupPath(a.path), timeout.duration) == system.deadLetters, 1 second) + awaitCond(Await.result(c2 ? LookupPath(a.path), timeout.duration).asInstanceOf[ActorRef].isTerminated, 1 second) } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala index 83c82e443b..ceea20b2c7 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala @@ -281,7 +281,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { " Use 'akka.serialization.Serialization.currentSystem.withValue(system) { ... }'" } - "must return deadLetters on deserialize if not present in actor hierarchy (and remoting is not enabled)" in { + "must return EmptyLocalActorRef on deserialize if not present in actor hierarchy (and remoting is not enabled)" in { import java.io._ val baos = new ByteArrayOutputStream(8192 * 32) @@ -297,7 +297,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { Serialization.currentSystem.withValue(system.asInstanceOf[ActorSystemImpl]) { val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray)) - in.readObject must be === system.deadLetters + in.readObject must be === new EmptyLocalActorRef(system.eventStream, system.dispatcher, system.actorFor("/").path / "non-existing") } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala index 5abf768c22..30828c1014 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala @@ -4,18 +4,28 @@ package akka.actor -import org.scalatest.BeforeAndAfterEach import akka.testkit._ import akka.util.duration._ import java.util.concurrent.atomic._ import akka.dispatch.Await @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSender with DefaultTimeout { - def startWatching(target: ActorRef) = system.actorOf(Props(new Actor { +class LocalDeathWatchSpec extends AkkaSpec with ImplicitSender with DefaultTimeout with DeathWatchSpec + +object DeathWatchSpec { + def props(target: ActorRef, testActor: ActorRef) = Props(new Actor { context.watch(target) def receive = { case x ⇒ testActor forward x } - })) + }) +} + +trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout ⇒ + + import DeathWatchSpec._ + + lazy val supervisor = system.actorOf(Props[Supervisor], "watchers") + + def startWatching(target: ActorRef) = Await.result((supervisor ? props(target, testActor)).mapTo[ActorRef], 3 seconds) "The Death Watch" must { def expectTerminationOf(actorRef: ActorRef) = expectMsgPF(5 seconds, actorRef + ": Stopped or Already terminated when linking") { diff --git a/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala index 2414a173cf..629fb814c4 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala @@ -54,5 +54,14 @@ class LocalActorRefProviderSpec extends AkkaSpec { } } + "throw suitable exceptions for malformed actor names" in { + intercept[InvalidActorNameException](system.actorOf(Props.empty, null)).getMessage.contains("null") must be(true) + intercept[InvalidActorNameException](system.actorOf(Props.empty, "")).getMessage.contains("empty") must be(true) + intercept[InvalidActorNameException](system.actorOf(Props.empty, "$hallo")).getMessage.contains("conform") must be(true) + intercept[InvalidActorNameException](system.actorOf(Props.empty, "a%")).getMessage.contains("conform") must be(true) + intercept[InvalidActorNameException](system.actorOf(Props.empty, "a?")).getMessage.contains("conform") must be(true) + intercept[InvalidActorNameException](system.actorOf(Props.empty, "üß")).getMessage.contains("conform") must be(true) + } + } } diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 711e7cc3a3..24fce420ac 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -10,6 +10,9 @@ import akka.testkit._ import akka.util.duration._ import akka.dispatch.Await import akka.util.Duration +import akka.config.ConfigurationException +import akka.routing.FromConfig +import com.typesafe.config.ConfigFactory object RoutingSpec { @@ -371,6 +374,25 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout with ImplicitSender { }), "Actor:" + id) } + "router FromConfig" must { + "throw suitable exception when not configured" in { + intercept[ConfigurationException] { + system.actorOf(Props.empty.withRouter(FromConfig)) + }.getMessage.contains("application.conf") must be(true) + } + + "allow external configuration" in { + val sys = ActorSystem("FromConfig", ConfigFactory + .parseString("akka.actor.deployment./routed.router=round-robin") + .withFallback(system.settings.config)) + try { + sys.actorOf(Props.empty.withRouter(FromConfig), "routed") + } finally { + sys.shutdown() + } + } + } + "custom router" must { "be started when constructed" in { val routedActor = system.actorOf(Props[TestActor].withRouter(VoteCountRouter)) diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 69b8c0b6c9..c9133e2235 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -233,8 +233,13 @@ private[akka] class ActorCell( def actorOf(props: Props): ActorRef = _actorOf(props, randomName()) def actorOf(props: Props, name: String): ActorRef = { - if (name == null || name == "" || name.charAt(0) == '$') - throw new InvalidActorNameException("actor name must not be null, empty or start with $") + import ActorPath.ElementRegex + name match { + case null ⇒ throw new InvalidActorNameException("actor name must not be null") + case "" ⇒ throw new InvalidActorNameException("actor name must not be empty") + case ElementRegex() ⇒ // this is fine + case _ ⇒ throw new InvalidActorNameException("illegal actor name '" + name + "', must conform to " + ElementRegex) + } if (childrenRefs contains name) throw new InvalidActorNameException("actor name " + name + " is not unique!") _actorOf(props, name) diff --git a/akka-actor/src/main/scala/akka/actor/ActorPath.scala b/akka-actor/src/main/scala/akka/actor/ActorPath.scala index a36c5e8973..2aa48a093d 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorPath.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorPath.scala @@ -15,6 +15,8 @@ object ActorPath { } rec(s.length, Nil) } + + val ElementRegex = """[-\w:@&=+,.!~*'_;][-\w:@&=+,.!~*'$_;]*""".r } /** @@ -57,7 +59,7 @@ sealed trait ActorPath extends Comparable[ActorPath] with Serializable { /** * Recursively create a descendant’s path by appending all child names. */ - def /(child: Iterable[String]): ActorPath = (this /: child)(_ / _) + def /(child: Iterable[String]): ActorPath = (this /: child)((path, elem) ⇒ if (elem.isEmpty) path else path / elem) /** * ''Java API'': Recursively create a descendant’s path by appending all child names. diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index ee53fec688..44ff94329c 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -208,13 +208,26 @@ trait ScalaActorRef { ref: ActorRef ⇒ def ?(message: Any, timeout: Timeout)(implicit ignore: Int = 0): Future[Any] = ?(message)(timeout) } +/** + * All ActorRefs have a scope which describes where they live. Since it is + * often necessary to distinguish between local and non-local references, this + * is the only method provided on the scope. + */ +trait ActorRefScope { + def isLocal: Boolean +} + +trait LocalRef extends ActorRefScope { + final def isLocal = true +} + /** * Internal trait for assembling all the functionality needed internally on * ActorRefs. NOTE THAT THIS IS NOT A STABLE EXTERNAL INTERFACE! * * DO NOT USE THIS UNLESS INTERNALLY WITHIN AKKA! */ -private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRef { +private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRef { this: ActorRefScope ⇒ def resume(): Unit def suspend(): Unit def restart(cause: Throwable): Unit @@ -230,6 +243,11 @@ private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRe * exist, return Nobody. */ def getChild(name: Iterator[String]): InternalActorRef + /** + * Scope: if this ref points to an actor which resides within the same JVM, + * i.e. whose mailbox is directly reachable etc. + */ + def isLocal: Boolean } private[akka] case object Nobody extends MinimalActorRef { @@ -247,7 +265,7 @@ private[akka] class LocalActorRef private[akka] ( val systemService: Boolean = false, _receiveTimeout: Option[Duration] = None, _hotswap: Stack[PartialFunction[Any, Unit]] = Props.noHotSwap) - extends InternalActorRef { + extends InternalActorRef with LocalRef { /* * actorCell.start() publishes actorCell & this to the dispatcher, which @@ -359,7 +377,7 @@ private[akka] class LocalActorRef private[akka] ( def restart(cause: Throwable): Unit = actorCell.restart(cause) @throws(classOf[java.io.ObjectStreamException]) - private def writeReplace(): AnyRef = SerializedActorRef(path.toString) + protected def writeReplace(): AnyRef = SerializedActorRef(path.toString) } /** @@ -382,7 +400,7 @@ case class SerializedActorRef(path: String) { /** * Trait for ActorRef implementations where all methods contain default stubs. */ -trait MinimalActorRef extends InternalActorRef { +trait MinimalActorRef extends InternalActorRef with LocalRef { def getParent: InternalActorRef = Nobody def getChild(names: Iterator[String]): InternalActorRef = { @@ -405,6 +423,9 @@ trait MinimalActorRef extends InternalActorRef { def sendSystemMessage(message: SystemMessage): Unit = () def restart(cause: Throwable): Unit = () + + @throws(classOf[java.io.ObjectStreamException]) + protected def writeReplace(): AnyRef = SerializedActorRef(path.toString) } object MinimalActorRef { @@ -436,8 +457,8 @@ class DeadLetterActorRef(val eventStream: EventStream) extends MinimalActorRef { _path } - private[akka] def init(dispatcher: MessageDispatcher, rootPath: ActorPath) { - _path = rootPath / "deadLetters" + private[akka] def init(dispatcher: MessageDispatcher, path: ActorPath) { + _path = path brokenPromise = Promise.failed(new ActorKilledException("In DeadLetterActorRef - promises are always broken."))(dispatcher) } @@ -456,7 +477,20 @@ class DeadLetterActorRef(val eventStream: EventStream) extends MinimalActorRef { } @throws(classOf[java.io.ObjectStreamException]) - private def writeReplace(): AnyRef = DeadLetterActorRef.serialized + override protected def writeReplace(): AnyRef = DeadLetterActorRef.serialized +} + +/** + * This special dead letter reference has a name: it is that which is returned + * by a local look-up which is unsuccessful. + */ +class EmptyLocalActorRef(_eventStream: EventStream, _dispatcher: MessageDispatcher, _path: ActorPath) + extends DeadLetterActorRef(_eventStream) { + init(_dispatcher, _path) + override def !(message: Any)(implicit sender: ActorRef = null): Unit = message match { + case d: DeadLetter ⇒ // do NOT form endless loops + case _ ⇒ eventStream.publish(DeadLetter(message, sender, this)) + } } class VirtualPathContainer(val path: ActorPath, override val getParent: InternalActorRef, val log: LoggingAdapter) extends MinimalActorRef { @@ -530,7 +564,4 @@ class AskActorRef( override def stop(): Unit = if (running.getAndSet(false)) { deathWatch.publish(Terminated(this)) } - - @throws(classOf[java.io.ObjectStreamException]) - private def writeReplace(): AnyRef = SerializedActorRef(path.toString) } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 57ef9f108c..d5f9b5ed54 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -451,22 +451,33 @@ class LocalActorRefProvider( def actorFor(ref: InternalActorRef, path: String): InternalActorRef = path match { case RelativeActorPath(elems) ⇒ - if (elems.isEmpty) deadLetters - else if (elems.head.isEmpty) actorFor(rootGuardian, elems.tail) + if (elems.isEmpty) { + log.debug("look-up of empty path string '{}' fails (per definition)", path) + deadLetters + } else if (elems.head.isEmpty) actorFor(rootGuardian, elems.tail) else actorFor(ref, elems) case LocalActorPath(address, elems) if address == rootPath.address ⇒ actorFor(rootGuardian, elems) - case _ ⇒ deadLetters + case _ ⇒ + log.debug("look-up of unknown path '{}' failed", path) + deadLetters } def actorFor(path: ActorPath): InternalActorRef = if (path.root == rootPath) actorFor(rootGuardian, path.elements) - else deadLetters + else { + log.debug("look-up of foreign ActorPath '{}' failed", path) + deadLetters + } def actorFor(ref: InternalActorRef, path: Iterable[String]): InternalActorRef = - if (path.isEmpty) deadLetters - else ref.getChild(path.iterator) match { - case Nobody ⇒ deadLetters - case x ⇒ x + if (path.isEmpty) { + log.debug("look-up of empty path sequence fails (per definition)") + deadLetters + } else ref.getChild(path.iterator) match { + case Nobody ⇒ + log.debug("look-up of path sequence '{}' failed", path) + new EmptyLocalActorRef(eventStream, dispatcher, ref.path / path) + case x ⇒ x } def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath, systemService: Boolean, deploy: Option[Deploy]): InternalActorRef = { diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index dbe5630789..0e95325ec3 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -381,7 +381,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor private lazy val _start: this.type = { // the provider is expected to start default loggers, LocalActorRefProvider does this provider.init(this) - deadLetters.init(dispatcher, provider.rootPath) + deadLetters.init(dispatcher, lookupRoot.path / "deadLetters") // this starts the reaper actor and the user-configured logging subscribers, which are also actors registerOnTermination(stopScheduler()) _locker = new Locker(scheduler, ReaperInterval, lookupRoot.path / "locker", deathWatch) diff --git a/akka-actor/src/main/scala/akka/actor/Locker.scala b/akka-actor/src/main/scala/akka/actor/Locker.scala index 8bbcdd15e6..d4fd1badd5 100644 --- a/akka-actor/src/main/scala/akka/actor/Locker.scala +++ b/akka-actor/src/main/scala/akka/actor/Locker.scala @@ -18,8 +18,8 @@ class Locker(scheduler: Scheduler, period: Duration, val path: ActorPath, val de val soul = iter.next() deathWatch.subscribe(Locker.this, soul.getKey) // in case Terminated got lost somewhere soul.getKey match { - case _: LocalActorRef ⇒ // nothing to do, they know what they signed up for - case nonlocal ⇒ nonlocal.stop() // try again in case it was due to a communications failure + case _: LocalRef ⇒ // nothing to do, they know what they signed up for + case nonlocal ⇒ nonlocal.stop() // try again in case it was due to a communications failure } } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index 5abdd32438..db200d09d6 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -6,7 +6,6 @@ package akka.dispatch import java.util.concurrent.TimeUnit import java.util.concurrent.ConcurrentHashMap -import akka.actor.LocalActorRef import akka.actor.newUuid import akka.util.{ Duration, ReflectiveAccess } import akka.actor.ActorSystem diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 86dc9d80d4..0c02952b3e 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -7,6 +7,7 @@ import akka.actor._ import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConversions._ import akka.util.{ Duration, Timeout } +import akka.config.ConfigurationException /** * A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to @@ -170,6 +171,22 @@ case object NoRouter extends RouterConfig { def createRoute(props: Props, actorContext: ActorContext, ref: RoutedActorRef): Route = null } +/** + * Router configuration which has no default, i.e. external configuration is required. + */ +case object FromConfig extends RouterConfig { + def createRoute(props: Props, actorContext: ActorContext, ref: RoutedActorRef): Route = + throw new ConfigurationException("router " + ref + " needs external configuration from file (e.g. application.conf)") +} + +/** + * Java API: Router configuration which has no default, i.e. external configuration is required. + */ +case class FromConfig() extends RouterConfig { + def createRoute(props: Props, actorContext: ActorContext, ref: RoutedActorRef): Route = + throw new ConfigurationException("router " + ref + " needs external configuration from file (e.g. application.conf)") +} + object RoundRobinRouter { def apply(routees: Iterable[ActorRef]) = new RoundRobinRouter(routees = routees map (_.path.toString)) } diff --git a/akka-actor/src/main/scala/akka/util/Helpers.scala b/akka-actor/src/main/scala/akka/util/Helpers.scala index c656ab37b1..a03715b4b8 100644 --- a/akka-actor/src/main/scala/akka/util/Helpers.scala +++ b/akka-actor/src/main/scala/akka/util/Helpers.scala @@ -26,7 +26,7 @@ object Helpers { def compare(a: AnyRef, b: AnyRef): Int = compareIdentityHash(a, b) } - final val base64chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+%" + final val base64chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+~" @tailrec def base64(l: Long, sb: StringBuilder = new StringBuilder("$")): String = { diff --git a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala index cd9186388e..42bece7d6a 100644 --- a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala @@ -6,7 +6,6 @@ package akka.actor.mailbox import com.surftools.BeanstalkClient._ import com.surftools.BeanstalkClientImpl._ import java.util.concurrent.TimeUnit.MILLISECONDS -import akka.actor.LocalActorRef import akka.util.Duration import akka.AkkaException import akka.actor.ActorContext diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoDurableMessage.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoDurableMessage.scala index 8a714147f0..f14c97010b 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoDurableMessage.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoDurableMessage.scala @@ -10,8 +10,8 @@ import org.bson.io.OutputBuffer import org.bson.types.ObjectId import java.io.InputStream import org.bson.collection._ +import akka.actor.{ ActorRef, ActorSystem } import akka.dispatch.Envelope -import akka.actor.{ ActorSystem, LocalActorRef, ActorRef } /** * A container message for durable mailbox messages, which can be easily stuffed into diff --git a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala index 6d0f173bbf..8c7587ec00 100644 --- a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala @@ -4,7 +4,6 @@ package akka.actor.mailbox import com.redis._ -import akka.actor.LocalActorRef import akka.AkkaException import akka.actor.ActorContext import akka.dispatch.Envelope diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala index 4309da402a..117acac383 100644 --- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala @@ -4,7 +4,6 @@ package akka.actor.mailbox import java.util.concurrent.TimeUnit.MILLISECONDS -import akka.actor.LocalActorRef import akka.util.Duration import akka.AkkaException import org.I0Itec.zkclient.serialize._ diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala index e5af760f40..4febbafe6f 100644 --- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala @@ -1,6 +1,6 @@ package akka.actor.mailbox -import akka.actor.{ Actor, LocalActorRef } +import akka.actor.Actor import akka.cluster.zookeeper._ import org.I0Itec.zkclient._ import akka.dispatch.MessageDispatcher diff --git a/akka-remote/src/main/java/akka/remote/RemoteProtocol.java b/akka-remote/src/main/java/akka/remote/RemoteProtocol.java index e7e8c15def..663e534bb2 100644 --- a/akka-remote/src/main/java/akka/remote/RemoteProtocol.java +++ b/akka-remote/src/main/java/akka/remote/RemoteProtocol.java @@ -218,111 +218,6 @@ public final class RemoteProtocol { // @@protoc_insertion_point(enum_scope:ReplicationStrategyType) } - public enum RemoteSystemDaemonMessageType - implements com.google.protobuf.ProtocolMessageEnum { - STOP(0, 1), - USE(1, 2), - RELEASE(2, 3), - MAKE_AVAILABLE(3, 4), - MAKE_UNAVAILABLE(4, 5), - DISCONNECT(5, 6), - RECONNECT(6, 7), - RESIGN(7, 8), - GOSSIP(8, 9), - FAIL_OVER_CONNECTIONS(9, 20), - FUNCTION_FUN0_UNIT(10, 21), - FUNCTION_FUN0_ANY(11, 22), - FUNCTION_FUN1_ARG_UNIT(12, 23), - FUNCTION_FUN1_ARG_ANY(13, 24), - ; - - public static final int STOP_VALUE = 1; - public static final int USE_VALUE = 2; - public static final int RELEASE_VALUE = 3; - public static final int MAKE_AVAILABLE_VALUE = 4; - public static final int MAKE_UNAVAILABLE_VALUE = 5; - public static final int DISCONNECT_VALUE = 6; - public static final int RECONNECT_VALUE = 7; - public static final int RESIGN_VALUE = 8; - public static final int GOSSIP_VALUE = 9; - public static final int FAIL_OVER_CONNECTIONS_VALUE = 20; - public static final int FUNCTION_FUN0_UNIT_VALUE = 21; - public static final int FUNCTION_FUN0_ANY_VALUE = 22; - public static final int FUNCTION_FUN1_ARG_UNIT_VALUE = 23; - public static final int FUNCTION_FUN1_ARG_ANY_VALUE = 24; - - - public final int getNumber() { return value; } - - public static RemoteSystemDaemonMessageType valueOf(int value) { - switch (value) { - case 1: return STOP; - case 2: return USE; - case 3: return RELEASE; - case 4: return MAKE_AVAILABLE; - case 5: return MAKE_UNAVAILABLE; - case 6: return DISCONNECT; - case 7: return RECONNECT; - case 8: return RESIGN; - case 9: return GOSSIP; - case 20: return FAIL_OVER_CONNECTIONS; - case 21: return FUNCTION_FUN0_UNIT; - case 22: return FUNCTION_FUN0_ANY; - case 23: return FUNCTION_FUN1_ARG_UNIT; - case 24: return FUNCTION_FUN1_ARG_ANY; - default: return null; - } - } - - public static com.google.protobuf.Internal.EnumLiteMap - internalGetValueMap() { - return internalValueMap; - } - private static com.google.protobuf.Internal.EnumLiteMap - internalValueMap = - new com.google.protobuf.Internal.EnumLiteMap() { - public RemoteSystemDaemonMessageType findValueByNumber(int number) { - return RemoteSystemDaemonMessageType.valueOf(number); - } - }; - - public final com.google.protobuf.Descriptors.EnumValueDescriptor - getValueDescriptor() { - return getDescriptor().getValues().get(index); - } - public final com.google.protobuf.Descriptors.EnumDescriptor - getDescriptorForType() { - return getDescriptor(); - } - public static final com.google.protobuf.Descriptors.EnumDescriptor - getDescriptor() { - return akka.remote.RemoteProtocol.getDescriptor().getEnumTypes().get(3); - } - - private static final RemoteSystemDaemonMessageType[] VALUES = { - STOP, USE, RELEASE, MAKE_AVAILABLE, MAKE_UNAVAILABLE, DISCONNECT, RECONNECT, RESIGN, GOSSIP, FAIL_OVER_CONNECTIONS, FUNCTION_FUN0_UNIT, FUNCTION_FUN0_ANY, FUNCTION_FUN1_ARG_UNIT, FUNCTION_FUN1_ARG_ANY, - }; - - public static RemoteSystemDaemonMessageType valueOf( - com.google.protobuf.Descriptors.EnumValueDescriptor desc) { - if (desc.getType() != getDescriptor()) { - throw new java.lang.IllegalArgumentException( - "EnumValueDescriptor is not for this type."); - } - return VALUES[desc.getIndex()]; - } - - private final int index; - private final int value; - - private RemoteSystemDaemonMessageType(int index, int value) { - this.index = index; - this.value = value; - } - - // @@protoc_insertion_point(enum_scope:RemoteSystemDaemonMessageType) - } - public interface AkkaRemoteProtocolOrBuilder extends com.google.protobuf.MessageOrBuilder { @@ -3349,417 +3244,6 @@ public final class RemoteProtocol { // @@protoc_insertion_point(class_scope:MessageProtocol) } - public interface UuidProtocolOrBuilder - extends com.google.protobuf.MessageOrBuilder { - - // required uint64 high = 1; - boolean hasHigh(); - long getHigh(); - - // required uint64 low = 2; - boolean hasLow(); - long getLow(); - } - public static final class UuidProtocol extends - com.google.protobuf.GeneratedMessage - implements UuidProtocolOrBuilder { - // Use UuidProtocol.newBuilder() to construct. - private UuidProtocol(Builder builder) { - super(builder); - } - private UuidProtocol(boolean noInit) {} - - private static final UuidProtocol defaultInstance; - public static UuidProtocol getDefaultInstance() { - return defaultInstance; - } - - public UuidProtocol getDefaultInstanceForType() { - return defaultInstance; - } - - public static final com.google.protobuf.Descriptors.Descriptor - getDescriptor() { - return akka.remote.RemoteProtocol.internal_static_UuidProtocol_descriptor; - } - - protected com.google.protobuf.GeneratedMessage.FieldAccessorTable - internalGetFieldAccessorTable() { - return akka.remote.RemoteProtocol.internal_static_UuidProtocol_fieldAccessorTable; - } - - private int bitField0_; - // required uint64 high = 1; - public static final int HIGH_FIELD_NUMBER = 1; - private long high_; - public boolean hasHigh() { - return ((bitField0_ & 0x00000001) == 0x00000001); - } - public long getHigh() { - return high_; - } - - // required uint64 low = 2; - public static final int LOW_FIELD_NUMBER = 2; - private long low_; - public boolean hasLow() { - return ((bitField0_ & 0x00000002) == 0x00000002); - } - public long getLow() { - return low_; - } - - private void initFields() { - high_ = 0L; - low_ = 0L; - } - private byte memoizedIsInitialized = -1; - public final boolean isInitialized() { - byte isInitialized = memoizedIsInitialized; - if (isInitialized != -1) return isInitialized == 1; - - if (!hasHigh()) { - memoizedIsInitialized = 0; - return false; - } - if (!hasLow()) { - memoizedIsInitialized = 0; - return false; - } - memoizedIsInitialized = 1; - return true; - } - - public void writeTo(com.google.protobuf.CodedOutputStream output) - throws java.io.IOException { - getSerializedSize(); - if (((bitField0_ & 0x00000001) == 0x00000001)) { - output.writeUInt64(1, high_); - } - if (((bitField0_ & 0x00000002) == 0x00000002)) { - output.writeUInt64(2, low_); - } - getUnknownFields().writeTo(output); - } - - private int memoizedSerializedSize = -1; - public int getSerializedSize() { - int size = memoizedSerializedSize; - if (size != -1) return size; - - size = 0; - if (((bitField0_ & 0x00000001) == 0x00000001)) { - size += com.google.protobuf.CodedOutputStream - .computeUInt64Size(1, high_); - } - if (((bitField0_ & 0x00000002) == 0x00000002)) { - size += com.google.protobuf.CodedOutputStream - .computeUInt64Size(2, low_); - } - size += getUnknownFields().getSerializedSize(); - memoizedSerializedSize = size; - return size; - } - - private static final long serialVersionUID = 0L; - @java.lang.Override - protected java.lang.Object writeReplace() - throws java.io.ObjectStreamException { - return super.writeReplace(); - } - - public static akka.remote.RemoteProtocol.UuidProtocol parseFrom( - com.google.protobuf.ByteString data) - throws com.google.protobuf.InvalidProtocolBufferException { - return newBuilder().mergeFrom(data).buildParsed(); - } - public static akka.remote.RemoteProtocol.UuidProtocol parseFrom( - com.google.protobuf.ByteString data, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws com.google.protobuf.InvalidProtocolBufferException { - return newBuilder().mergeFrom(data, extensionRegistry) - .buildParsed(); - } - public static akka.remote.RemoteProtocol.UuidProtocol parseFrom(byte[] data) - throws com.google.protobuf.InvalidProtocolBufferException { - return newBuilder().mergeFrom(data).buildParsed(); - } - public static akka.remote.RemoteProtocol.UuidProtocol parseFrom( - byte[] data, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws com.google.protobuf.InvalidProtocolBufferException { - return newBuilder().mergeFrom(data, extensionRegistry) - .buildParsed(); - } - public static akka.remote.RemoteProtocol.UuidProtocol parseFrom(java.io.InputStream input) - throws java.io.IOException { - return newBuilder().mergeFrom(input).buildParsed(); - } - public static akka.remote.RemoteProtocol.UuidProtocol parseFrom( - java.io.InputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - return newBuilder().mergeFrom(input, extensionRegistry) - .buildParsed(); - } - public static akka.remote.RemoteProtocol.UuidProtocol parseDelimitedFrom(java.io.InputStream input) - throws java.io.IOException { - Builder builder = newBuilder(); - if (builder.mergeDelimitedFrom(input)) { - return builder.buildParsed(); - } else { - return null; - } - } - public static akka.remote.RemoteProtocol.UuidProtocol parseDelimitedFrom( - java.io.InputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - Builder builder = newBuilder(); - if (builder.mergeDelimitedFrom(input, extensionRegistry)) { - return builder.buildParsed(); - } else { - return null; - } - } - public static akka.remote.RemoteProtocol.UuidProtocol parseFrom( - com.google.protobuf.CodedInputStream input) - throws java.io.IOException { - return newBuilder().mergeFrom(input).buildParsed(); - } - public static akka.remote.RemoteProtocol.UuidProtocol parseFrom( - com.google.protobuf.CodedInputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - return newBuilder().mergeFrom(input, extensionRegistry) - .buildParsed(); - } - - public static Builder newBuilder() { return Builder.create(); } - public Builder newBuilderForType() { return newBuilder(); } - public static Builder newBuilder(akka.remote.RemoteProtocol.UuidProtocol prototype) { - return newBuilder().mergeFrom(prototype); - } - public Builder toBuilder() { return newBuilder(this); } - - @java.lang.Override - protected Builder newBuilderForType( - com.google.protobuf.GeneratedMessage.BuilderParent parent) { - Builder builder = new Builder(parent); - return builder; - } - public static final class Builder extends - com.google.protobuf.GeneratedMessage.Builder - implements akka.remote.RemoteProtocol.UuidProtocolOrBuilder { - public static final com.google.protobuf.Descriptors.Descriptor - getDescriptor() { - return akka.remote.RemoteProtocol.internal_static_UuidProtocol_descriptor; - } - - protected com.google.protobuf.GeneratedMessage.FieldAccessorTable - internalGetFieldAccessorTable() { - return akka.remote.RemoteProtocol.internal_static_UuidProtocol_fieldAccessorTable; - } - - // Construct using akka.remote.RemoteProtocol.UuidProtocol.newBuilder() - private Builder() { - maybeForceBuilderInitialization(); - } - - private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) { - super(parent); - maybeForceBuilderInitialization(); - } - private void maybeForceBuilderInitialization() { - if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { - } - } - private static Builder create() { - return new Builder(); - } - - public Builder clear() { - super.clear(); - high_ = 0L; - bitField0_ = (bitField0_ & ~0x00000001); - low_ = 0L; - bitField0_ = (bitField0_ & ~0x00000002); - return this; - } - - public Builder clone() { - return create().mergeFrom(buildPartial()); - } - - public com.google.protobuf.Descriptors.Descriptor - getDescriptorForType() { - return akka.remote.RemoteProtocol.UuidProtocol.getDescriptor(); - } - - public akka.remote.RemoteProtocol.UuidProtocol getDefaultInstanceForType() { - return akka.remote.RemoteProtocol.UuidProtocol.getDefaultInstance(); - } - - public akka.remote.RemoteProtocol.UuidProtocol build() { - akka.remote.RemoteProtocol.UuidProtocol result = buildPartial(); - if (!result.isInitialized()) { - throw newUninitializedMessageException(result); - } - return result; - } - - private akka.remote.RemoteProtocol.UuidProtocol buildParsed() - throws com.google.protobuf.InvalidProtocolBufferException { - akka.remote.RemoteProtocol.UuidProtocol result = buildPartial(); - if (!result.isInitialized()) { - throw newUninitializedMessageException( - result).asInvalidProtocolBufferException(); - } - return result; - } - - public akka.remote.RemoteProtocol.UuidProtocol buildPartial() { - akka.remote.RemoteProtocol.UuidProtocol result = new akka.remote.RemoteProtocol.UuidProtocol(this); - int from_bitField0_ = bitField0_; - int to_bitField0_ = 0; - if (((from_bitField0_ & 0x00000001) == 0x00000001)) { - to_bitField0_ |= 0x00000001; - } - result.high_ = high_; - if (((from_bitField0_ & 0x00000002) == 0x00000002)) { - to_bitField0_ |= 0x00000002; - } - result.low_ = low_; - result.bitField0_ = to_bitField0_; - onBuilt(); - return result; - } - - public Builder mergeFrom(com.google.protobuf.Message other) { - if (other instanceof akka.remote.RemoteProtocol.UuidProtocol) { - return mergeFrom((akka.remote.RemoteProtocol.UuidProtocol)other); - } else { - super.mergeFrom(other); - return this; - } - } - - public Builder mergeFrom(akka.remote.RemoteProtocol.UuidProtocol other) { - if (other == akka.remote.RemoteProtocol.UuidProtocol.getDefaultInstance()) return this; - if (other.hasHigh()) { - setHigh(other.getHigh()); - } - if (other.hasLow()) { - setLow(other.getLow()); - } - this.mergeUnknownFields(other.getUnknownFields()); - return this; - } - - public final boolean isInitialized() { - if (!hasHigh()) { - - return false; - } - if (!hasLow()) { - - return false; - } - return true; - } - - public Builder mergeFrom( - com.google.protobuf.CodedInputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - com.google.protobuf.UnknownFieldSet.Builder unknownFields = - com.google.protobuf.UnknownFieldSet.newBuilder( - this.getUnknownFields()); - while (true) { - int tag = input.readTag(); - switch (tag) { - case 0: - this.setUnknownFields(unknownFields.build()); - onChanged(); - return this; - default: { - if (!parseUnknownField(input, unknownFields, - extensionRegistry, tag)) { - this.setUnknownFields(unknownFields.build()); - onChanged(); - return this; - } - break; - } - case 8: { - bitField0_ |= 0x00000001; - high_ = input.readUInt64(); - break; - } - case 16: { - bitField0_ |= 0x00000002; - low_ = input.readUInt64(); - break; - } - } - } - } - - private int bitField0_; - - // required uint64 high = 1; - private long high_ ; - public boolean hasHigh() { - return ((bitField0_ & 0x00000001) == 0x00000001); - } - public long getHigh() { - return high_; - } - public Builder setHigh(long value) { - bitField0_ |= 0x00000001; - high_ = value; - onChanged(); - return this; - } - public Builder clearHigh() { - bitField0_ = (bitField0_ & ~0x00000001); - high_ = 0L; - onChanged(); - return this; - } - - // required uint64 low = 2; - private long low_ ; - public boolean hasLow() { - return ((bitField0_ & 0x00000002) == 0x00000002); - } - public long getLow() { - return low_; - } - public Builder setLow(long value) { - bitField0_ |= 0x00000002; - low_ = value; - onChanged(); - return this; - } - public Builder clearLow() { - bitField0_ = (bitField0_ & ~0x00000002); - low_ = 0L; - onChanged(); - return this; - } - - // @@protoc_insertion_point(builder_scope:UuidProtocol) - } - - static { - defaultInstance = new UuidProtocol(true); - defaultInstance.initFields(); - } - - // @@protoc_insertion_point(class_scope:UuidProtocol) - } - public interface MetadataEntryProtocolOrBuilder extends com.google.protobuf.MessageOrBuilder { @@ -5246,764 +4730,6 @@ public final class RemoteProtocol { // @@protoc_insertion_point(class_scope:ExceptionProtocol) } - public interface RemoteSystemDaemonMessageProtocolOrBuilder - extends com.google.protobuf.MessageOrBuilder { - - // required .RemoteSystemDaemonMessageType messageType = 1; - boolean hasMessageType(); - akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType getMessageType(); - - // optional string actorPath = 2; - boolean hasActorPath(); - String getActorPath(); - - // optional bytes payload = 3; - boolean hasPayload(); - com.google.protobuf.ByteString getPayload(); - - // optional .UuidProtocol replicateActorFromUuid = 4; - boolean hasReplicateActorFromUuid(); - akka.remote.RemoteProtocol.UuidProtocol getReplicateActorFromUuid(); - akka.remote.RemoteProtocol.UuidProtocolOrBuilder getReplicateActorFromUuidOrBuilder(); - - // optional string supervisor = 5; - boolean hasSupervisor(); - String getSupervisor(); - } - public static final class RemoteSystemDaemonMessageProtocol extends - com.google.protobuf.GeneratedMessage - implements RemoteSystemDaemonMessageProtocolOrBuilder { - // Use RemoteSystemDaemonMessageProtocol.newBuilder() to construct. - private RemoteSystemDaemonMessageProtocol(Builder builder) { - super(builder); - } - private RemoteSystemDaemonMessageProtocol(boolean noInit) {} - - private static final RemoteSystemDaemonMessageProtocol defaultInstance; - public static RemoteSystemDaemonMessageProtocol getDefaultInstance() { - return defaultInstance; - } - - public RemoteSystemDaemonMessageProtocol getDefaultInstanceForType() { - return defaultInstance; - } - - public static final com.google.protobuf.Descriptors.Descriptor - getDescriptor() { - return akka.remote.RemoteProtocol.internal_static_RemoteSystemDaemonMessageProtocol_descriptor; - } - - protected com.google.protobuf.GeneratedMessage.FieldAccessorTable - internalGetFieldAccessorTable() { - return akka.remote.RemoteProtocol.internal_static_RemoteSystemDaemonMessageProtocol_fieldAccessorTable; - } - - private int bitField0_; - // required .RemoteSystemDaemonMessageType messageType = 1; - public static final int MESSAGETYPE_FIELD_NUMBER = 1; - private akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType messageType_; - public boolean hasMessageType() { - return ((bitField0_ & 0x00000001) == 0x00000001); - } - public akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType getMessageType() { - return messageType_; - } - - // optional string actorPath = 2; - public static final int ACTORPATH_FIELD_NUMBER = 2; - private java.lang.Object actorPath_; - public boolean hasActorPath() { - return ((bitField0_ & 0x00000002) == 0x00000002); - } - public String getActorPath() { - java.lang.Object ref = actorPath_; - if (ref instanceof String) { - return (String) ref; - } else { - com.google.protobuf.ByteString bs = - (com.google.protobuf.ByteString) ref; - String s = bs.toStringUtf8(); - if (com.google.protobuf.Internal.isValidUtf8(bs)) { - actorPath_ = s; - } - return s; - } - } - private com.google.protobuf.ByteString getActorPathBytes() { - java.lang.Object ref = actorPath_; - if (ref instanceof String) { - com.google.protobuf.ByteString b = - com.google.protobuf.ByteString.copyFromUtf8((String) ref); - actorPath_ = b; - return b; - } else { - return (com.google.protobuf.ByteString) ref; - } - } - - // optional bytes payload = 3; - public static final int PAYLOAD_FIELD_NUMBER = 3; - private com.google.protobuf.ByteString payload_; - public boolean hasPayload() { - return ((bitField0_ & 0x00000004) == 0x00000004); - } - public com.google.protobuf.ByteString getPayload() { - return payload_; - } - - // optional .UuidProtocol replicateActorFromUuid = 4; - public static final int REPLICATEACTORFROMUUID_FIELD_NUMBER = 4; - private akka.remote.RemoteProtocol.UuidProtocol replicateActorFromUuid_; - public boolean hasReplicateActorFromUuid() { - return ((bitField0_ & 0x00000008) == 0x00000008); - } - public akka.remote.RemoteProtocol.UuidProtocol getReplicateActorFromUuid() { - return replicateActorFromUuid_; - } - public akka.remote.RemoteProtocol.UuidProtocolOrBuilder getReplicateActorFromUuidOrBuilder() { - return replicateActorFromUuid_; - } - - // optional string supervisor = 5; - public static final int SUPERVISOR_FIELD_NUMBER = 5; - private java.lang.Object supervisor_; - public boolean hasSupervisor() { - return ((bitField0_ & 0x00000010) == 0x00000010); - } - public String getSupervisor() { - java.lang.Object ref = supervisor_; - if (ref instanceof String) { - return (String) ref; - } else { - com.google.protobuf.ByteString bs = - (com.google.protobuf.ByteString) ref; - String s = bs.toStringUtf8(); - if (com.google.protobuf.Internal.isValidUtf8(bs)) { - supervisor_ = s; - } - return s; - } - } - private com.google.protobuf.ByteString getSupervisorBytes() { - java.lang.Object ref = supervisor_; - if (ref instanceof String) { - com.google.protobuf.ByteString b = - com.google.protobuf.ByteString.copyFromUtf8((String) ref); - supervisor_ = b; - return b; - } else { - return (com.google.protobuf.ByteString) ref; - } - } - - private void initFields() { - messageType_ = akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType.STOP; - actorPath_ = ""; - payload_ = com.google.protobuf.ByteString.EMPTY; - replicateActorFromUuid_ = akka.remote.RemoteProtocol.UuidProtocol.getDefaultInstance(); - supervisor_ = ""; - } - private byte memoizedIsInitialized = -1; - public final boolean isInitialized() { - byte isInitialized = memoizedIsInitialized; - if (isInitialized != -1) return isInitialized == 1; - - if (!hasMessageType()) { - memoizedIsInitialized = 0; - return false; - } - if (hasReplicateActorFromUuid()) { - if (!getReplicateActorFromUuid().isInitialized()) { - memoizedIsInitialized = 0; - return false; - } - } - memoizedIsInitialized = 1; - return true; - } - - public void writeTo(com.google.protobuf.CodedOutputStream output) - throws java.io.IOException { - getSerializedSize(); - if (((bitField0_ & 0x00000001) == 0x00000001)) { - output.writeEnum(1, messageType_.getNumber()); - } - if (((bitField0_ & 0x00000002) == 0x00000002)) { - output.writeBytes(2, getActorPathBytes()); - } - if (((bitField0_ & 0x00000004) == 0x00000004)) { - output.writeBytes(3, payload_); - } - if (((bitField0_ & 0x00000008) == 0x00000008)) { - output.writeMessage(4, replicateActorFromUuid_); - } - if (((bitField0_ & 0x00000010) == 0x00000010)) { - output.writeBytes(5, getSupervisorBytes()); - } - getUnknownFields().writeTo(output); - } - - private int memoizedSerializedSize = -1; - public int getSerializedSize() { - int size = memoizedSerializedSize; - if (size != -1) return size; - - size = 0; - if (((bitField0_ & 0x00000001) == 0x00000001)) { - size += com.google.protobuf.CodedOutputStream - .computeEnumSize(1, messageType_.getNumber()); - } - if (((bitField0_ & 0x00000002) == 0x00000002)) { - size += com.google.protobuf.CodedOutputStream - .computeBytesSize(2, getActorPathBytes()); - } - if (((bitField0_ & 0x00000004) == 0x00000004)) { - size += com.google.protobuf.CodedOutputStream - .computeBytesSize(3, payload_); - } - if (((bitField0_ & 0x00000008) == 0x00000008)) { - size += com.google.protobuf.CodedOutputStream - .computeMessageSize(4, replicateActorFromUuid_); - } - if (((bitField0_ & 0x00000010) == 0x00000010)) { - size += com.google.protobuf.CodedOutputStream - .computeBytesSize(5, getSupervisorBytes()); - } - size += getUnknownFields().getSerializedSize(); - memoizedSerializedSize = size; - return size; - } - - private static final long serialVersionUID = 0L; - @java.lang.Override - protected java.lang.Object writeReplace() - throws java.io.ObjectStreamException { - return super.writeReplace(); - } - - public static akka.remote.RemoteProtocol.RemoteSystemDaemonMessageProtocol parseFrom( - com.google.protobuf.ByteString data) - throws com.google.protobuf.InvalidProtocolBufferException { - return newBuilder().mergeFrom(data).buildParsed(); - } - public static akka.remote.RemoteProtocol.RemoteSystemDaemonMessageProtocol parseFrom( - com.google.protobuf.ByteString data, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws com.google.protobuf.InvalidProtocolBufferException { - return newBuilder().mergeFrom(data, extensionRegistry) - .buildParsed(); - } - public static akka.remote.RemoteProtocol.RemoteSystemDaemonMessageProtocol parseFrom(byte[] data) - throws com.google.protobuf.InvalidProtocolBufferException { - return newBuilder().mergeFrom(data).buildParsed(); - } - public static akka.remote.RemoteProtocol.RemoteSystemDaemonMessageProtocol parseFrom( - byte[] data, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws com.google.protobuf.InvalidProtocolBufferException { - return newBuilder().mergeFrom(data, extensionRegistry) - .buildParsed(); - } - public static akka.remote.RemoteProtocol.RemoteSystemDaemonMessageProtocol parseFrom(java.io.InputStream input) - throws java.io.IOException { - return newBuilder().mergeFrom(input).buildParsed(); - } - public static akka.remote.RemoteProtocol.RemoteSystemDaemonMessageProtocol parseFrom( - java.io.InputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - return newBuilder().mergeFrom(input, extensionRegistry) - .buildParsed(); - } - public static akka.remote.RemoteProtocol.RemoteSystemDaemonMessageProtocol parseDelimitedFrom(java.io.InputStream input) - throws java.io.IOException { - Builder builder = newBuilder(); - if (builder.mergeDelimitedFrom(input)) { - return builder.buildParsed(); - } else { - return null; - } - } - public static akka.remote.RemoteProtocol.RemoteSystemDaemonMessageProtocol parseDelimitedFrom( - java.io.InputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - Builder builder = newBuilder(); - if (builder.mergeDelimitedFrom(input, extensionRegistry)) { - return builder.buildParsed(); - } else { - return null; - } - } - public static akka.remote.RemoteProtocol.RemoteSystemDaemonMessageProtocol parseFrom( - com.google.protobuf.CodedInputStream input) - throws java.io.IOException { - return newBuilder().mergeFrom(input).buildParsed(); - } - public static akka.remote.RemoteProtocol.RemoteSystemDaemonMessageProtocol parseFrom( - com.google.protobuf.CodedInputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - return newBuilder().mergeFrom(input, extensionRegistry) - .buildParsed(); - } - - public static Builder newBuilder() { return Builder.create(); } - public Builder newBuilderForType() { return newBuilder(); } - public static Builder newBuilder(akka.remote.RemoteProtocol.RemoteSystemDaemonMessageProtocol prototype) { - return newBuilder().mergeFrom(prototype); - } - public Builder toBuilder() { return newBuilder(this); } - - @java.lang.Override - protected Builder newBuilderForType( - com.google.protobuf.GeneratedMessage.BuilderParent parent) { - Builder builder = new Builder(parent); - return builder; - } - public static final class Builder extends - com.google.protobuf.GeneratedMessage.Builder - implements akka.remote.RemoteProtocol.RemoteSystemDaemonMessageProtocolOrBuilder { - public static final com.google.protobuf.Descriptors.Descriptor - getDescriptor() { - return akka.remote.RemoteProtocol.internal_static_RemoteSystemDaemonMessageProtocol_descriptor; - } - - protected com.google.protobuf.GeneratedMessage.FieldAccessorTable - internalGetFieldAccessorTable() { - return akka.remote.RemoteProtocol.internal_static_RemoteSystemDaemonMessageProtocol_fieldAccessorTable; - } - - // Construct using akka.remote.RemoteProtocol.RemoteSystemDaemonMessageProtocol.newBuilder() - private Builder() { - maybeForceBuilderInitialization(); - } - - private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) { - super(parent); - maybeForceBuilderInitialization(); - } - private void maybeForceBuilderInitialization() { - if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { - getReplicateActorFromUuidFieldBuilder(); - } - } - private static Builder create() { - return new Builder(); - } - - public Builder clear() { - super.clear(); - messageType_ = akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType.STOP; - bitField0_ = (bitField0_ & ~0x00000001); - actorPath_ = ""; - bitField0_ = (bitField0_ & ~0x00000002); - payload_ = com.google.protobuf.ByteString.EMPTY; - bitField0_ = (bitField0_ & ~0x00000004); - if (replicateActorFromUuidBuilder_ == null) { - replicateActorFromUuid_ = akka.remote.RemoteProtocol.UuidProtocol.getDefaultInstance(); - } else { - replicateActorFromUuidBuilder_.clear(); - } - bitField0_ = (bitField0_ & ~0x00000008); - supervisor_ = ""; - bitField0_ = (bitField0_ & ~0x00000010); - return this; - } - - public Builder clone() { - return create().mergeFrom(buildPartial()); - } - - public com.google.protobuf.Descriptors.Descriptor - getDescriptorForType() { - return akka.remote.RemoteProtocol.RemoteSystemDaemonMessageProtocol.getDescriptor(); - } - - public akka.remote.RemoteProtocol.RemoteSystemDaemonMessageProtocol getDefaultInstanceForType() { - return akka.remote.RemoteProtocol.RemoteSystemDaemonMessageProtocol.getDefaultInstance(); - } - - public akka.remote.RemoteProtocol.RemoteSystemDaemonMessageProtocol build() { - akka.remote.RemoteProtocol.RemoteSystemDaemonMessageProtocol result = buildPartial(); - if (!result.isInitialized()) { - throw newUninitializedMessageException(result); - } - return result; - } - - private akka.remote.RemoteProtocol.RemoteSystemDaemonMessageProtocol buildParsed() - throws com.google.protobuf.InvalidProtocolBufferException { - akka.remote.RemoteProtocol.RemoteSystemDaemonMessageProtocol result = buildPartial(); - if (!result.isInitialized()) { - throw newUninitializedMessageException( - result).asInvalidProtocolBufferException(); - } - return result; - } - - public akka.remote.RemoteProtocol.RemoteSystemDaemonMessageProtocol buildPartial() { - akka.remote.RemoteProtocol.RemoteSystemDaemonMessageProtocol result = new akka.remote.RemoteProtocol.RemoteSystemDaemonMessageProtocol(this); - int from_bitField0_ = bitField0_; - int to_bitField0_ = 0; - if (((from_bitField0_ & 0x00000001) == 0x00000001)) { - to_bitField0_ |= 0x00000001; - } - result.messageType_ = messageType_; - if (((from_bitField0_ & 0x00000002) == 0x00000002)) { - to_bitField0_ |= 0x00000002; - } - result.actorPath_ = actorPath_; - if (((from_bitField0_ & 0x00000004) == 0x00000004)) { - to_bitField0_ |= 0x00000004; - } - result.payload_ = payload_; - if (((from_bitField0_ & 0x00000008) == 0x00000008)) { - to_bitField0_ |= 0x00000008; - } - if (replicateActorFromUuidBuilder_ == null) { - result.replicateActorFromUuid_ = replicateActorFromUuid_; - } else { - result.replicateActorFromUuid_ = replicateActorFromUuidBuilder_.build(); - } - if (((from_bitField0_ & 0x00000010) == 0x00000010)) { - to_bitField0_ |= 0x00000010; - } - result.supervisor_ = supervisor_; - result.bitField0_ = to_bitField0_; - onBuilt(); - return result; - } - - public Builder mergeFrom(com.google.protobuf.Message other) { - if (other instanceof akka.remote.RemoteProtocol.RemoteSystemDaemonMessageProtocol) { - return mergeFrom((akka.remote.RemoteProtocol.RemoteSystemDaemonMessageProtocol)other); - } else { - super.mergeFrom(other); - return this; - } - } - - public Builder mergeFrom(akka.remote.RemoteProtocol.RemoteSystemDaemonMessageProtocol other) { - if (other == akka.remote.RemoteProtocol.RemoteSystemDaemonMessageProtocol.getDefaultInstance()) return this; - if (other.hasMessageType()) { - setMessageType(other.getMessageType()); - } - if (other.hasActorPath()) { - setActorPath(other.getActorPath()); - } - if (other.hasPayload()) { - setPayload(other.getPayload()); - } - if (other.hasReplicateActorFromUuid()) { - mergeReplicateActorFromUuid(other.getReplicateActorFromUuid()); - } - if (other.hasSupervisor()) { - setSupervisor(other.getSupervisor()); - } - this.mergeUnknownFields(other.getUnknownFields()); - return this; - } - - public final boolean isInitialized() { - if (!hasMessageType()) { - - return false; - } - if (hasReplicateActorFromUuid()) { - if (!getReplicateActorFromUuid().isInitialized()) { - - return false; - } - } - return true; - } - - public Builder mergeFrom( - com.google.protobuf.CodedInputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - com.google.protobuf.UnknownFieldSet.Builder unknownFields = - com.google.protobuf.UnknownFieldSet.newBuilder( - this.getUnknownFields()); - while (true) { - int tag = input.readTag(); - switch (tag) { - case 0: - this.setUnknownFields(unknownFields.build()); - onChanged(); - return this; - default: { - if (!parseUnknownField(input, unknownFields, - extensionRegistry, tag)) { - this.setUnknownFields(unknownFields.build()); - onChanged(); - return this; - } - break; - } - case 8: { - int rawValue = input.readEnum(); - akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType value = akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType.valueOf(rawValue); - if (value == null) { - unknownFields.mergeVarintField(1, rawValue); - } else { - bitField0_ |= 0x00000001; - messageType_ = value; - } - break; - } - case 18: { - bitField0_ |= 0x00000002; - actorPath_ = input.readBytes(); - break; - } - case 26: { - bitField0_ |= 0x00000004; - payload_ = input.readBytes(); - break; - } - case 34: { - akka.remote.RemoteProtocol.UuidProtocol.Builder subBuilder = akka.remote.RemoteProtocol.UuidProtocol.newBuilder(); - if (hasReplicateActorFromUuid()) { - subBuilder.mergeFrom(getReplicateActorFromUuid()); - } - input.readMessage(subBuilder, extensionRegistry); - setReplicateActorFromUuid(subBuilder.buildPartial()); - break; - } - case 42: { - bitField0_ |= 0x00000010; - supervisor_ = input.readBytes(); - break; - } - } - } - } - - private int bitField0_; - - // required .RemoteSystemDaemonMessageType messageType = 1; - private akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType messageType_ = akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType.STOP; - public boolean hasMessageType() { - return ((bitField0_ & 0x00000001) == 0x00000001); - } - public akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType getMessageType() { - return messageType_; - } - public Builder setMessageType(akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType value) { - if (value == null) { - throw new NullPointerException(); - } - bitField0_ |= 0x00000001; - messageType_ = value; - onChanged(); - return this; - } - public Builder clearMessageType() { - bitField0_ = (bitField0_ & ~0x00000001); - messageType_ = akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType.STOP; - onChanged(); - return this; - } - - // optional string actorPath = 2; - private java.lang.Object actorPath_ = ""; - public boolean hasActorPath() { - return ((bitField0_ & 0x00000002) == 0x00000002); - } - public String getActorPath() { - java.lang.Object ref = actorPath_; - if (!(ref instanceof String)) { - String s = ((com.google.protobuf.ByteString) ref).toStringUtf8(); - actorPath_ = s; - return s; - } else { - return (String) ref; - } - } - public Builder setActorPath(String value) { - if (value == null) { - throw new NullPointerException(); - } - bitField0_ |= 0x00000002; - actorPath_ = value; - onChanged(); - return this; - } - public Builder clearActorPath() { - bitField0_ = (bitField0_ & ~0x00000002); - actorPath_ = getDefaultInstance().getActorPath(); - onChanged(); - return this; - } - void setActorPath(com.google.protobuf.ByteString value) { - bitField0_ |= 0x00000002; - actorPath_ = value; - onChanged(); - } - - // optional bytes payload = 3; - private com.google.protobuf.ByteString payload_ = com.google.protobuf.ByteString.EMPTY; - public boolean hasPayload() { - return ((bitField0_ & 0x00000004) == 0x00000004); - } - public com.google.protobuf.ByteString getPayload() { - return payload_; - } - public Builder setPayload(com.google.protobuf.ByteString value) { - if (value == null) { - throw new NullPointerException(); - } - bitField0_ |= 0x00000004; - payload_ = value; - onChanged(); - return this; - } - public Builder clearPayload() { - bitField0_ = (bitField0_ & ~0x00000004); - payload_ = getDefaultInstance().getPayload(); - onChanged(); - return this; - } - - // optional .UuidProtocol replicateActorFromUuid = 4; - private akka.remote.RemoteProtocol.UuidProtocol replicateActorFromUuid_ = akka.remote.RemoteProtocol.UuidProtocol.getDefaultInstance(); - private com.google.protobuf.SingleFieldBuilder< - akka.remote.RemoteProtocol.UuidProtocol, akka.remote.RemoteProtocol.UuidProtocol.Builder, akka.remote.RemoteProtocol.UuidProtocolOrBuilder> replicateActorFromUuidBuilder_; - public boolean hasReplicateActorFromUuid() { - return ((bitField0_ & 0x00000008) == 0x00000008); - } - public akka.remote.RemoteProtocol.UuidProtocol getReplicateActorFromUuid() { - if (replicateActorFromUuidBuilder_ == null) { - return replicateActorFromUuid_; - } else { - return replicateActorFromUuidBuilder_.getMessage(); - } - } - public Builder setReplicateActorFromUuid(akka.remote.RemoteProtocol.UuidProtocol value) { - if (replicateActorFromUuidBuilder_ == null) { - if (value == null) { - throw new NullPointerException(); - } - replicateActorFromUuid_ = value; - onChanged(); - } else { - replicateActorFromUuidBuilder_.setMessage(value); - } - bitField0_ |= 0x00000008; - return this; - } - public Builder setReplicateActorFromUuid( - akka.remote.RemoteProtocol.UuidProtocol.Builder builderForValue) { - if (replicateActorFromUuidBuilder_ == null) { - replicateActorFromUuid_ = builderForValue.build(); - onChanged(); - } else { - replicateActorFromUuidBuilder_.setMessage(builderForValue.build()); - } - bitField0_ |= 0x00000008; - return this; - } - public Builder mergeReplicateActorFromUuid(akka.remote.RemoteProtocol.UuidProtocol value) { - if (replicateActorFromUuidBuilder_ == null) { - if (((bitField0_ & 0x00000008) == 0x00000008) && - replicateActorFromUuid_ != akka.remote.RemoteProtocol.UuidProtocol.getDefaultInstance()) { - replicateActorFromUuid_ = - akka.remote.RemoteProtocol.UuidProtocol.newBuilder(replicateActorFromUuid_).mergeFrom(value).buildPartial(); - } else { - replicateActorFromUuid_ = value; - } - onChanged(); - } else { - replicateActorFromUuidBuilder_.mergeFrom(value); - } - bitField0_ |= 0x00000008; - return this; - } - public Builder clearReplicateActorFromUuid() { - if (replicateActorFromUuidBuilder_ == null) { - replicateActorFromUuid_ = akka.remote.RemoteProtocol.UuidProtocol.getDefaultInstance(); - onChanged(); - } else { - replicateActorFromUuidBuilder_.clear(); - } - bitField0_ = (bitField0_ & ~0x00000008); - return this; - } - public akka.remote.RemoteProtocol.UuidProtocol.Builder getReplicateActorFromUuidBuilder() { - bitField0_ |= 0x00000008; - onChanged(); - return getReplicateActorFromUuidFieldBuilder().getBuilder(); - } - public akka.remote.RemoteProtocol.UuidProtocolOrBuilder getReplicateActorFromUuidOrBuilder() { - if (replicateActorFromUuidBuilder_ != null) { - return replicateActorFromUuidBuilder_.getMessageOrBuilder(); - } else { - return replicateActorFromUuid_; - } - } - private com.google.protobuf.SingleFieldBuilder< - akka.remote.RemoteProtocol.UuidProtocol, akka.remote.RemoteProtocol.UuidProtocol.Builder, akka.remote.RemoteProtocol.UuidProtocolOrBuilder> - getReplicateActorFromUuidFieldBuilder() { - if (replicateActorFromUuidBuilder_ == null) { - replicateActorFromUuidBuilder_ = new com.google.protobuf.SingleFieldBuilder< - akka.remote.RemoteProtocol.UuidProtocol, akka.remote.RemoteProtocol.UuidProtocol.Builder, akka.remote.RemoteProtocol.UuidProtocolOrBuilder>( - replicateActorFromUuid_, - getParentForChildren(), - isClean()); - replicateActorFromUuid_ = null; - } - return replicateActorFromUuidBuilder_; - } - - // optional string supervisor = 5; - private java.lang.Object supervisor_ = ""; - public boolean hasSupervisor() { - return ((bitField0_ & 0x00000010) == 0x00000010); - } - public String getSupervisor() { - java.lang.Object ref = supervisor_; - if (!(ref instanceof String)) { - String s = ((com.google.protobuf.ByteString) ref).toStringUtf8(); - supervisor_ = s; - return s; - } else { - return (String) ref; - } - } - public Builder setSupervisor(String value) { - if (value == null) { - throw new NullPointerException(); - } - bitField0_ |= 0x00000010; - supervisor_ = value; - onChanged(); - return this; - } - public Builder clearSupervisor() { - bitField0_ = (bitField0_ & ~0x00000010); - supervisor_ = getDefaultInstance().getSupervisor(); - onChanged(); - return this; - } - void setSupervisor(com.google.protobuf.ByteString value) { - bitField0_ |= 0x00000010; - supervisor_ = value; - onChanged(); - } - - // @@protoc_insertion_point(builder_scope:RemoteSystemDaemonMessageProtocol) - } - - static { - defaultInstance = new RemoteSystemDaemonMessageProtocol(true); - defaultInstance.initFields(); - } - - // @@protoc_insertion_point(class_scope:RemoteSystemDaemonMessageProtocol) - } - public interface DurableMailboxMessageProtocolOrBuilder extends com.google.protobuf.MessageOrBuilder { @@ -6692,11 +5418,6 @@ public final class RemoteProtocol { private static com.google.protobuf.GeneratedMessage.FieldAccessorTable internal_static_MessageProtocol_fieldAccessorTable; - private static com.google.protobuf.Descriptors.Descriptor - internal_static_UuidProtocol_descriptor; - private static - com.google.protobuf.GeneratedMessage.FieldAccessorTable - internal_static_UuidProtocol_fieldAccessorTable; private static com.google.protobuf.Descriptors.Descriptor internal_static_MetadataEntryProtocol_descriptor; private static @@ -6712,11 +5433,6 @@ public final class RemoteProtocol { private static com.google.protobuf.GeneratedMessage.FieldAccessorTable internal_static_ExceptionProtocol_fieldAccessorTable; - private static com.google.protobuf.Descriptors.Descriptor - internal_static_RemoteSystemDaemonMessageProtocol_descriptor; - private static - com.google.protobuf.GeneratedMessage.FieldAccessorTable - internal_static_RemoteSystemDaemonMessageProtocol_fieldAccessorTable; private static com.google.protobuf.Descriptors.Descriptor internal_static_DurableMailboxMessageProtocol_descriptor; private static @@ -6744,32 +5460,19 @@ public final class RemoteProtocol { "n\030\003 \001(\0132\020.AddressProtocol\" \n\020ActorRefPro" + "tocol\022\014\n\004path\030\001 \002(\t\";\n\017MessageProtocol\022\017" + "\n\007message\030\001 \002(\014\022\027\n\017messageManifest\030\002 \001(\014" + - "\")\n\014UuidProtocol\022\014\n\004high\030\001 \002(\004\022\013\n\003low\030\002 " + - "\002(\004\"3\n\025MetadataEntryProtocol\022\013\n\003key\030\001 \002(" + - "\t\022\r\n\005value\030\002 \002(\014\"A\n\017AddressProtocol\022\016\n\006s" + - "ystem\030\001 \002(\t\022\020\n\010hostname\030\002 \002(\t\022\014\n\004port\030\003 " + - "\002(\r\"7\n\021ExceptionProtocol\022\021\n\tclassname\030\001 " + - "\002(\t\022\017\n\007message\030\002 \002(\t\"\277\001\n!RemoteSystemDae" + - "monMessageProtocol\0223\n\013messageType\030\001 \002(\0162", - "\036.RemoteSystemDaemonMessageType\022\021\n\tactor" + - "Path\030\002 \001(\t\022\017\n\007payload\030\003 \001(\014\022-\n\026replicate" + - "ActorFromUuid\030\004 \001(\0132\r.UuidProtocol\022\022\n\nsu" + - "pervisor\030\005 \001(\t\"y\n\035DurableMailboxMessageP" + - "rotocol\022$\n\trecipient\030\001 \002(\0132\021.ActorRefPro" + - "tocol\022!\n\006sender\030\002 \001(\0132\021.ActorRefProtocol" + - "\022\017\n\007message\030\003 \002(\014*(\n\013CommandType\022\013\n\007CONN" + - "ECT\020\001\022\014\n\010SHUTDOWN\020\002*K\n\026ReplicationStorag" + - "eType\022\r\n\tTRANSIENT\020\001\022\023\n\017TRANSACTION_LOG\020" + - "\002\022\r\n\tDATA_GRID\020\003*>\n\027ReplicationStrategyT", - "ype\022\021\n\rWRITE_THROUGH\020\001\022\020\n\014WRITE_BEHIND\020\002" + - "*\241\002\n\035RemoteSystemDaemonMessageType\022\010\n\004ST" + - "OP\020\001\022\007\n\003USE\020\002\022\013\n\007RELEASE\020\003\022\022\n\016MAKE_AVAIL" + - "ABLE\020\004\022\024\n\020MAKE_UNAVAILABLE\020\005\022\016\n\nDISCONNE" + - "CT\020\006\022\r\n\tRECONNECT\020\007\022\n\n\006RESIGN\020\010\022\n\n\006GOSSI" + - "P\020\t\022\031\n\025FAIL_OVER_CONNECTIONS\020\024\022\026\n\022FUNCTI" + - "ON_FUN0_UNIT\020\025\022\025\n\021FUNCTION_FUN0_ANY\020\026\022\032\n" + - "\026FUNCTION_FUN1_ARG_UNIT\020\027\022\031\n\025FUNCTION_FU" + - "N1_ARG_ANY\020\030B\017\n\013akka.remoteH\001" + "\"3\n\025MetadataEntryProtocol\022\013\n\003key\030\001 \002(\t\022\r" + + "\n\005value\030\002 \002(\014\"A\n\017AddressProtocol\022\016\n\006syst" + + "em\030\001 \002(\t\022\020\n\010hostname\030\002 \002(\t\022\014\n\004port\030\003 \002(\r" + + "\"7\n\021ExceptionProtocol\022\021\n\tclassname\030\001 \002(\t" + + "\022\017\n\007message\030\002 \002(\t\"y\n\035DurableMailboxMessa" + + "geProtocol\022$\n\trecipient\030\001 \002(\0132\021.ActorRef" + + "Protocol\022!\n\006sender\030\002 \001(\0132\021.ActorRefProto", + "col\022\017\n\007message\030\003 \002(\014*(\n\013CommandType\022\013\n\007C" + + "ONNECT\020\001\022\014\n\010SHUTDOWN\020\002*K\n\026ReplicationSto" + + "rageType\022\r\n\tTRANSIENT\020\001\022\023\n\017TRANSACTION_L" + + "OG\020\002\022\r\n\tDATA_GRID\020\003*>\n\027ReplicationStrate" + + "gyType\022\021\n\rWRITE_THROUGH\020\001\022\020\n\014WRITE_BEHIN" + + "D\020\002B\017\n\013akka.remoteH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -6816,16 +5519,8 @@ public final class RemoteProtocol { new java.lang.String[] { "Message", "MessageManifest", }, akka.remote.RemoteProtocol.MessageProtocol.class, akka.remote.RemoteProtocol.MessageProtocol.Builder.class); - internal_static_UuidProtocol_descriptor = - getDescriptor().getMessageTypes().get(5); - internal_static_UuidProtocol_fieldAccessorTable = new - com.google.protobuf.GeneratedMessage.FieldAccessorTable( - internal_static_UuidProtocol_descriptor, - new java.lang.String[] { "High", "Low", }, - akka.remote.RemoteProtocol.UuidProtocol.class, - akka.remote.RemoteProtocol.UuidProtocol.Builder.class); internal_static_MetadataEntryProtocol_descriptor = - getDescriptor().getMessageTypes().get(6); + getDescriptor().getMessageTypes().get(5); internal_static_MetadataEntryProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_MetadataEntryProtocol_descriptor, @@ -6833,7 +5528,7 @@ public final class RemoteProtocol { akka.remote.RemoteProtocol.MetadataEntryProtocol.class, akka.remote.RemoteProtocol.MetadataEntryProtocol.Builder.class); internal_static_AddressProtocol_descriptor = - getDescriptor().getMessageTypes().get(7); + getDescriptor().getMessageTypes().get(6); internal_static_AddressProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_AddressProtocol_descriptor, @@ -6841,23 +5536,15 @@ public final class RemoteProtocol { akka.remote.RemoteProtocol.AddressProtocol.class, akka.remote.RemoteProtocol.AddressProtocol.Builder.class); internal_static_ExceptionProtocol_descriptor = - getDescriptor().getMessageTypes().get(8); + getDescriptor().getMessageTypes().get(7); internal_static_ExceptionProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_ExceptionProtocol_descriptor, new java.lang.String[] { "Classname", "Message", }, akka.remote.RemoteProtocol.ExceptionProtocol.class, akka.remote.RemoteProtocol.ExceptionProtocol.Builder.class); - internal_static_RemoteSystemDaemonMessageProtocol_descriptor = - getDescriptor().getMessageTypes().get(9); - internal_static_RemoteSystemDaemonMessageProtocol_fieldAccessorTable = new - com.google.protobuf.GeneratedMessage.FieldAccessorTable( - internal_static_RemoteSystemDaemonMessageProtocol_descriptor, - new java.lang.String[] { "MessageType", "ActorPath", "Payload", "ReplicateActorFromUuid", "Supervisor", }, - akka.remote.RemoteProtocol.RemoteSystemDaemonMessageProtocol.class, - akka.remote.RemoteProtocol.RemoteSystemDaemonMessageProtocol.Builder.class); internal_static_DurableMailboxMessageProtocol_descriptor = - getDescriptor().getMessageTypes().get(10); + getDescriptor().getMessageTypes().get(8); internal_static_DurableMailboxMessageProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_DurableMailboxMessageProtocol_descriptor, diff --git a/akka-remote/src/main/protocol/RemoteProtocol.proto b/akka-remote/src/main/protocol/RemoteProtocol.proto index f7763ba2cc..3f54b5a633 100644 --- a/akka-remote/src/main/protocol/RemoteProtocol.proto +++ b/akka-remote/src/main/protocol/RemoteProtocol.proto @@ -76,14 +76,6 @@ message MessageProtocol { optional bytes messageManifest = 2; } -/** - * Defines a UUID. - */ -message UuidProtocol { - required uint64 high = 1; - required uint64 low = 2; -} - /** * Defines a meta data entry. */ @@ -109,37 +101,6 @@ message ExceptionProtocol { required string message = 2; } -/** - * Defines the remote system daemon message. - */ -message RemoteSystemDaemonMessageProtocol { - required RemoteSystemDaemonMessageType messageType = 1; - optional string actorPath = 2; - optional bytes payload = 3; - optional UuidProtocol replicateActorFromUuid = 4; - optional string supervisor = 5; -} - -/** - * Defines the remote system daemon message type. - */ -enum RemoteSystemDaemonMessageType { - STOP = 1; - USE = 2; - RELEASE = 3; - MAKE_AVAILABLE = 4; - MAKE_UNAVAILABLE = 5; - DISCONNECT = 6; - RECONNECT = 7; - RESIGN = 8; - GOSSIP = 9; - FAIL_OVER_CONNECTIONS = 20; - FUNCTION_FUN0_UNIT = 21; - FUNCTION_FUN0_ANY = 22; - FUNCTION_FUN1_ARG_UNIT = 23; - FUNCTION_FUN1_ARG_ANY = 24; -} - /** * Defines the durable mailbox message. */ diff --git a/akka-remote/src/main/scala/akka/remote/Gossiper.scala b/akka-remote/src/main/scala/akka/remote/Gossiper.scala index 121ead58bc..a12e5ecab1 100644 --- a/akka-remote/src/main/scala/akka/remote/Gossiper.scala +++ b/akka-remote/src/main/scala/akka/remote/Gossiper.scala @@ -8,8 +8,6 @@ import akka.actor._ import akka.actor.Status._ import akka.event.Logging import akka.util.Duration -import akka.remote.RemoteProtocol._ -import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._ import akka.config.ConfigurationException import akka.serialization.SerializationExtension @@ -253,7 +251,7 @@ class Gossiper(remote: Remote, system: ActorSystemImpl) { try { val t = remoteSettings.RemoteSystemDaemonAckTimeout - Await.result(connection ? (toRemoteMessage(newGossip), t), t) match { + Await.result(connection ? (newGossip, t), t) match { case Success(receiver) ⇒ log.debug("Gossip sent to [{}] was successfully received", receiver) case Failure(cause) ⇒ log.error(cause, cause.toString) } @@ -308,19 +306,6 @@ class Gossiper(remote: Remote, system: ActorSystemImpl) { from copy (version = newVersion) } - private def toRemoteMessage(gossip: Gossip): RemoteProtocol.RemoteSystemDaemonMessageProtocol = { - val gossipAsBytes = serialization.serialize(gossip) match { - case Left(error) ⇒ throw error - case Right(bytes) ⇒ bytes - } - - RemoteSystemDaemonMessageProtocol.newBuilder - .setMessageType(GOSSIP) - .setActorPath(remote.remoteDaemon.path.toString) - .setPayload(ByteString.copyFrom(gossipAsBytes)) - .build() - } - private def latestVersionOf(newGossip: Gossip, oldGossip: Gossip): Gossip = { (newGossip.version compare oldGossip.version) match { case VectorClock.After ⇒ newGossip // gossiped version is newer, use new version diff --git a/akka-remote/src/main/scala/akka/remote/Remote.scala b/akka-remote/src/main/scala/akka/remote/Remote.scala index d9de67c473..304d82e0eb 100644 --- a/akka-remote/src/main/scala/akka/remote/Remote.scala +++ b/akka-remote/src/main/scala/akka/remote/Remote.scala @@ -11,8 +11,6 @@ import akka.util._ import akka.util.duration._ import akka.util.Helpers._ import akka.serialization.Compression.LZF -import akka.remote.RemoteProtocol._ -import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._ import java.net.InetSocketAddress import com.eaio.uuid.UUID import akka.serialization.{ JavaSerializer, Serialization, Serializer, Compression, SerializationExtension } @@ -21,6 +19,7 @@ import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.TimeUnit.MILLISECONDS import akka.dispatch.SystemMessage import scala.annotation.tailrec +import akka.remote.RemoteProtocol.{ ActorRefProtocol, AkkaRemoteProtocol, RemoteControlProtocol, RemoteMessageProtocol } /** * Remote module - contains remote client and server config, remote server instance, remote daemon, remote dispatchers etc. @@ -115,6 +114,10 @@ class Remote(val settings: ActorSystem.Settings, val remoteSettings: RemoteSetti } } +sealed trait DaemonMsg +case class DaemonMsgCreate(factory: () ⇒ Actor, path: String, supervisor: ActorRef) extends DaemonMsg +case class DaemonMsgWatch(watcher: ActorRef, watched: ActorRef) extends DaemonMsg + /** * Internal system "daemon" actor for remote internal communication. * @@ -149,140 +152,39 @@ class RemoteSystemDaemon(system: ActorSystemImpl, remote: Remote, _path: ActorPa } override def !(msg: Any)(implicit sender: ActorRef = null): Unit = msg match { - case message: RemoteSystemDaemonMessageProtocol ⇒ - log.debug("Received command [\n{}] to RemoteSystemDaemon on [{}]", message.getMessageType, remote.remoteSettings.NodeName) + case message: DaemonMsg ⇒ + log.debug("Received command [\n{}] to RemoteSystemDaemon on [{}]", message, remote.remoteSettings.NodeName) + message match { + case DaemonMsgCreate(factory, path, supervisor) ⇒ + import remote.remoteAddress + implicit val t = remote.transports - message.getMessageType match { - case USE ⇒ handleUse(message) - case RELEASE ⇒ handleRelease(message) - // case STOP ⇒ cluster.shutdown() - // case DISCONNECT ⇒ cluster.disconnect() - // case RECONNECT ⇒ cluster.reconnect() - // case RESIGN ⇒ cluster.resign() - // case FAIL_OVER_CONNECTIONS ⇒ handleFailover(message) - case GOSSIP ⇒ handleGossip(message) - // case FUNCTION_FUN0_UNIT ⇒ handle_fun0_unit(message) - // case FUNCTION_FUN0_ANY ⇒ handle_fun0_any(message, sender) - // case FUNCTION_FUN1_ARG_UNIT ⇒ handle_fun1_arg_unit(message) - // case FUNCTION_FUN1_ARG_ANY ⇒ handle_fun1_arg_any(message, sender) - case unknown ⇒ log.warning("Unknown message type {} received by {}", unknown, this) + path match { + case ParsedActorPath(`remoteAddress`, elems) if elems.nonEmpty && elems.head == "remote" ⇒ + // TODO RK canonicalize path so as not to duplicate it always #1446 + val subpath = elems.drop(1) + val path = remote.remoteDaemon.path / subpath + val actor = system.provider.actorOf(system, + Props(creator = factory), + supervisor.asInstanceOf[InternalActorRef], + path, true, None) + addChild(subpath.mkString("/"), actor) + system.deathWatch.subscribe(this, actor) + case _ ⇒ + log.error("remote path does not match path from message [{}]", message) + } + case DaemonMsgWatch(watcher, watched) ⇒ + val other = system.actorFor(watcher.path.root / "remote") + system.deathWatch.subscribe(other, watched) } - case Terminated(child) ⇒ removeChild(child.path.elements.drop(1).mkString("/")) + case Terminated(child: LocalActorRef) ⇒ removeChild(child.path.elements.drop(1).mkString("/")) - case unknown ⇒ log.warning("Unknown message {} received by {}", unknown, this) + case t: Terminated ⇒ system.deathWatch.publish(t) + + case unknown ⇒ log.warning("Unknown message {} received by {}", unknown, this) } - def handleUse(message: RemoteSystemDaemonMessageProtocol) { - - if (!message.hasActorPath || !message.hasSupervisor) log.error("Ignoring incomplete USE command [{}]", message) - else { - - val actorFactoryBytes = - if (remote.remoteSettings.ShouldCompressData) LZF.uncompress(message.getPayload.toByteArray) - else message.getPayload.toByteArray - - val actorFactory = - remote.serialization.deserialize(actorFactoryBytes, classOf[() ⇒ Actor], None) match { - case Left(error) ⇒ throw error - case Right(instance) ⇒ instance.asInstanceOf[() ⇒ Actor] - } - - import remote.remoteAddress - implicit val t = remote.transports - - message.getActorPath match { - case ParsedActorPath(`remoteAddress`, elems) if elems.nonEmpty && elems.head == "remote" ⇒ - // TODO RK canonicalize path so as not to duplicate it always #1446 - val subpath = elems.drop(1) - val path = remote.remoteDaemon.path / subpath - val supervisor = system.actorFor(message.getSupervisor).asInstanceOf[InternalActorRef] - val actor = system.provider.actorOf(system, Props(creator = actorFactory), supervisor, path, true, None) - addChild(subpath.mkString("/"), actor) - system.deathWatch.subscribe(this, actor) - case _ ⇒ - log.error("remote path does not match path from message [{}]", message) - } - } - } - - // FIXME implement handleRelease - def handleRelease(message: RemoteSystemDaemonMessageProtocol) { - } - - def handleGossip(message: RemoteSystemDaemonMessageProtocol) { - // try { - // val gossip = serialization.deserialize(message.getPayload.toByteArray, classOf[Gossip], None) match { - // case Left(error) ⇒ throw error - // case Right(instance) ⇒ instance.asInstanceOf[Gossip] - // } - - // gossiper tell gossip - - // sender ! Success(address.toString) - // } catch { - // case error: Throwable ⇒ - // sender ! Failure(error) - // throw error - // } - } - - /* - * generate name for temporary actor refs - */ - // private val tempNumber = new AtomicLong - // def tempName = "$_" + Helpers.base64(tempNumber.getAndIncrement()) - // def tempPath = remote.remoteDaemon.path / tempName - // - // // FIXME: handle real remote supervision, ticket #1408 - // def handle_fun0_unit(message: RemoteSystemDaemonMessageProtocol) { - // new LocalActorRef(remote.system, - // Props( - // context ⇒ { - // case f: Function0[_] ⇒ try { f() } finally { context.self.stop() } - // }).copy(dispatcher = remote.computeGridDispatcher), remote.remoteDaemon, tempPath, systemService = true) ! payloadFor(message, classOf[Function0[Unit]]) - // } - // - // // FIXME: handle real remote supervision, ticket #1408 - // def handle_fun0_any(message: RemoteSystemDaemonMessageProtocol, sender: ActorRef) { - // implicit val s = sender - // new LocalActorRef(remote.system, - // Props( - // context ⇒ { - // case f: Function0[_] ⇒ try { context.sender ! f() } finally { context.self.stop() } - // }).copy(dispatcher = remote.computeGridDispatcher), remote.remoteDaemon, tempPath, systemService = true) ! payloadFor(message, classOf[Function0[Any]]) - // } - // - // // FIXME: handle real remote supervision, ticket #1408 - // def handle_fun1_arg_unit(message: RemoteSystemDaemonMessageProtocol) { - // new LocalActorRef(remote.system, - // Props( - // context ⇒ { - // case (fun: Function[_, _], param: Any) ⇒ try { fun.asInstanceOf[Any ⇒ Unit].apply(param) } finally { context.self.stop() } - // }).copy(dispatcher = remote.computeGridDispatcher), remote.remoteDaemon, tempPath, systemService = true) ! payloadFor(message, classOf[Tuple2[Function1[Any, Unit], Any]]) - // } - // - // // FIXME: handle real remote supervision, ticket #1408 - // def handle_fun1_arg_any(message: RemoteSystemDaemonMessageProtocol, sender: ActorRef) { - // implicit val s = sender - // new LocalActorRef(remote.system, - // Props( - // context ⇒ { - // case (fun: Function[_, _], param: Any) ⇒ try { context.sender ! fun.asInstanceOf[Any ⇒ Any](param) } finally { context.self.stop() } - // }).copy(dispatcher = remote.computeGridDispatcher), remote.remoteDaemon, tempPath, systemService = true) ! payloadFor(message, classOf[Tuple2[Function1[Any, Any], Any]]) - // } - - def handleFailover(message: RemoteSystemDaemonMessageProtocol) { - // val (from, to) = payloadFor(message, classOf[(InetSocketremoteDaemonServiceName, InetSocketremoteDaemonServiceName)]) - // cluster.failOverClusterActorRefConnections(from, to) - } - - private def payloadFor[T](message: RemoteSystemDaemonMessageProtocol, clazz: Class[T]): T = { - remote.serialization.deserialize(message.getPayload.toByteArray, clazz, None) match { - case Left(error) ⇒ throw error - case Right(instance) ⇒ instance.asInstanceOf[T] - } - } } class RemoteMessage(input: RemoteMessageProtocol, system: ActorSystemImpl, classLoader: Option[ClassLoader] = None) { @@ -350,14 +252,13 @@ trait RemoteMarshallingOps { remoteMessage.recipient match { case `remoteDaemon` ⇒ remoteMessage.payload match { - case m: RemoteSystemDaemonMessageProtocol ⇒ - implicit val timeout = system.settings.ActorTimeout + case m @ (_: DaemonMsg | _: Terminated) ⇒ try remoteDaemon ! m catch { - case e: Exception ⇒ log.error(e, "exception while processing remote command {} from {}", m.getMessageType(), remoteMessage.sender) + case e: Exception ⇒ log.error(e, "exception while processing remote command {} from {}", m, remoteMessage.sender) } case x ⇒ log.warning("remoteDaemon received illegal message {} from {}", x, remoteMessage.sender) } - case l @ (_: LocalActorRef | _: MinimalActorRef) ⇒ + case l: LocalRef ⇒ remoteMessage.payload match { case msg: SystemMessage ⇒ if (useUntrustedMode) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 99524ad3b6..89ae932cbf 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -11,8 +11,6 @@ import akka.util.Timeout import akka.config.ConfigurationException import akka.event.{ DeathWatch, Logging } import akka.serialization.Compression.LZF -import akka.remote.RemoteProtocol._ -import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._ import com.google.protobuf.ByteString import akka.event.EventStream import akka.dispatch.Promise @@ -33,7 +31,6 @@ class RemoteActorRefProvider( val remoteSettings = new RemoteSettings(settings.config, systemName) - def deathWatch = local.deathWatch def rootGuardian = local.rootGuardian def guardian = local.guardian def systemGuardian = local.systemGuardian @@ -51,6 +48,8 @@ class RemoteActorRefProvider( private val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, _deadLetters, rootPath, deployer) + val deathWatch = new RemoteDeathWatch(local.deathWatch, this) + def init(system: ActorSystemImpl) { local.init(system) remote.init(system, this) @@ -147,24 +146,15 @@ class RemoteActorRefProvider( def useActorOnNode(path: ActorPath, actorFactory: () ⇒ Actor, supervisor: ActorRef) { log.debug("[{}] Instantiating Remote Actor [{}]", rootPath, path) - val actorFactoryBytes = - remote.serialization.serialize(actorFactory) match { - case Left(error) ⇒ throw error - case Right(bytes) ⇒ if (remoteSettings.ShouldCompressData) LZF.compress(bytes) else bytes - } - - val command = RemoteSystemDaemonMessageProtocol.newBuilder - .setMessageType(USE) - .setActorPath(path.toString) - .setPayload(ByteString.copyFrom(actorFactoryBytes)) - .setSupervisor(supervisor.path.toString) - .build() - // we don’t wait for the ACK, because the remote end will process this command before any other message to the new actor - actorFor(RootActorPath(path.address) / "remote") ! command + actorFor(RootActorPath(path.address) / "remote") ! DaemonMsgCreate(actorFactory, path.toString, supervisor) } } +trait RemoteRef extends ActorRefScope { + final def isLocal = false +} + /** * Remote ActorRef that is used when referencing the Actor on a different node than its "home" node. * This reference is network-aware (remembers its origin) and immutable. @@ -175,7 +165,7 @@ private[akka] class RemoteActorRef private[akka] ( val path: ActorPath, val getParent: InternalActorRef, loader: Option[ClassLoader]) - extends InternalActorRef { + extends InternalActorRef with RemoteRef { def getChild(name: Iterator[String]): InternalActorRef = { val s = name.toStream @@ -217,3 +207,25 @@ private[akka] class RemoteActorRef private[akka] ( @throws(classOf[java.io.ObjectStreamException]) private def writeReplace(): AnyRef = SerializedActorRef(path.toString) } + +class RemoteDeathWatch(val local: LocalDeathWatch, val provider: RemoteActorRefProvider) extends DeathWatch { + + def subscribe(watcher: ActorRef, watched: ActorRef): Boolean = watched match { + case r: RemoteRef ⇒ + val ret = local.subscribe(watcher, watched) + provider.actorFor(r.path.root / "remote") ! DaemonMsgWatch(watcher, watched) + ret + case l: LocalRef ⇒ + local.subscribe(watcher, watched) + case _ ⇒ + provider.log.error("unknown ActorRef type {} as DeathWatch target", watched.getClass) + false + } + + def unsubscribe(watcher: ActorRef, watched: ActorRef): Boolean = local.unsubscribe(watcher, watched) + + def unsubscribe(watcher: ActorRef): Unit = local.unsubscribe(watcher) + + def publish(event: Terminated): Unit = local.publish(event) + +} diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index c5fcd9e276..df4df415d4 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -60,6 +60,7 @@ abstract class RemoteClient private[akka] ( * Converts the message to the wireprotocol and sends the message across the wire */ def send(message: Any, senderOption: Option[ActorRef], recipient: ActorRef): Unit = if (isRunning) { + log.debug("Sending message: {}", message) send(remoteSupport.createRemoteMessageProtocolBuilder(recipient, message, senderOption).build) } else { val exception = new RemoteClientException("RemoteModule client is not running, make sure you have invoked 'RemoteClient.connect()' before using it.", remoteSupport, remoteAddress) @@ -70,9 +71,7 @@ abstract class RemoteClient private[akka] ( /** * Sends the message across the wire */ - def send(request: RemoteMessageProtocol): Unit = { - log.debug("Sending message: {}", new RemoteMessage(request, remoteSupport.system)) - + private def send(request: RemoteMessageProtocol): Unit = { try { val payload = remoteSupport.createMessageSendEnvelope(request) currentChannel.write(payload).addListener( diff --git a/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala new file mode 100644 index 0000000000..b51720aa01 --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala @@ -0,0 +1,33 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ +package akka.remote + +import akka.testkit._ +import akka.actor.{ ActorSystem, DeathWatchSpec } +import com.typesafe.config.ConfigFactory + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class RemoteDeathWatchSpec extends AkkaSpec(ConfigFactory.parseString(""" +akka { + actor { + provider = "akka.remote.RemoteActorRefProvider" + deployment { + /watchers.remote = "akka://other@127.0.0.1:2666" + } + } + cluster.nodename = buh + remote.server { + hostname = "127.0.0.1" + port = 2665 + } +} +""")) with ImplicitSender with DefaultTimeout with DeathWatchSpec { + + val other = ActorSystem("other", ConfigFactory.parseString("akka.remote.server.port=2666").withFallback(system.settings.config)) + + override def atTermination() { + other.shutdown() + } + +}