From ae0cb4a756df8f13dfe2cce94a1ac39a690c0c2f Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 5 Apr 2013 16:12:45 +0200 Subject: [PATCH] Add dispatcher to deployment config, see #2839 --- .../test/scala/akka/actor/DeployerSpec.scala | 20 +++- .../actor/LocalActorRefProviderSpec.scala | 9 +- .../akka/actor/dispatch/DispatchersSpec.scala | 33 +++++- akka-actor/src/main/resources/reference.conf | 24 ++-- .../scala/akka/actor/ActorRefProvider.scala | 20 +++- .../src/main/scala/akka/actor/Deployer.scala | 16 ++- .../src/main/resources/reference.conf | 2 +- .../akka/cluster/ClusterDeployerSpec.scala | 7 +- .../dispatcher/DispatcherDocTestBase.java | 30 +++-- akka-docs/rst/java/dispatchers.rst | 30 +++-- .../docs/dispatcher/DispatcherDocSpec.scala | 22 +++- akka-docs/rst/scala/dispatchers.rst | 30 +++-- .../main/java/akka/remote/RemoteProtocol.java | 104 +++++++++++++++++- .../src/main/protocol/RemoteProtocol.proto | 1 + .../akka/remote/RemoteActorRefProvider.scala | 2 +- .../DaemonMsgCreateSerializer.scala | 8 +- .../akka/remote/RemoteDeployerSpec.scala | 5 +- .../DaemonMsgCreateSerializerSpec.scala | 6 +- 18 files changed, 293 insertions(+), 76 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala index c49e9e4a62..d4760b241c 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala @@ -25,6 +25,9 @@ object DeployerSpec { # nr-of-instances ignored when router = from-code nr-of-instances = 2 } + /service3 { + dispatcher = my-dispatcher + } /service-round-robin { router = round-robin } @@ -70,14 +73,14 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) { "be able to parse 'akka.actor.deployment._' with all default values" in { val service = "/service1" val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookup(service.split("/").drop(1)) - deployment must be('defined) deployment must be(Some( Deploy( service, deployment.get.config, NoRouter, - NoScopeGiven))) + NoScopeGiven, + Deploy.NoDispatcherGiven))) } "use None deployment for undefined service" in { @@ -86,6 +89,19 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) { deployment must be(None) } + "be able to parse 'akka.actor.deployment._' with dispatcher config" in { + val service = "/service3" + val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookup(service.split("/").drop(1)) + + deployment must be(Some( + Deploy( + service, + deployment.get.config, + NoRouter, + NoScopeGiven, + dispatcher = "my-dispatcher"))) + } + "detect invalid number-of-instances" in { intercept[com.typesafe.config.ConfigException.WrongType] { val invalidDeployerConf = ConfigFactory.parseString(""" diff --git a/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala index 8fc765f9d8..dae82a5c5f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala @@ -66,12 +66,13 @@ class LocalActorRefProviderSpec extends AkkaSpec(LocalActorRefProviderSpec.confi }))) a.tell(GetChild, testActor) val child = expectMsgType[ActorRef] - child.asInstanceOf[LocalActorRef].underlying.props must be theSameInstanceAs Props.empty + val childProps1 = child.asInstanceOf[LocalActorRef].underlying.props + childProps1 must be(Props.empty) system stop a expectMsgType[Terminated] - val childProps = child.asInstanceOf[LocalActorRef].underlying.props - childProps must not be theSameInstanceAs(Props.empty) - childProps must be theSameInstanceAs ActorCell.terminatedProps + val childProps2 = child.asInstanceOf[LocalActorRef].underlying.props + childProps2 must not be theSameInstanceAs(childProps1) + childProps2 must be theSameInstanceAs ActorCell.terminatedProps } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatchersSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatchersSpec.scala index 6c39655706..b4ec85b44f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatchersSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatchersSpec.scala @@ -4,7 +4,6 @@ package akka.actor.dispatch import language.postfixOps - import java.util.concurrent.{ CountDownLatch, TimeUnit } import scala.reflect.ClassTag import akka.dispatch._ @@ -15,6 +14,7 @@ import com.typesafe.config.ConfigFactory import akka.actor.Actor import akka.actor.Props import scala.concurrent.duration._ +import akka.actor.ActorRef object DispatchersSpec { val config = """ @@ -33,6 +33,14 @@ object DispatchersSpec { type = BalancingDispatcher } } + akka.actor.deployment { + /echo1 { + dispatcher = myapp.mydispatcher + } + /echo2 { + dispatcher = myapp.mydispatcher + } + } """ class ThreadNameEcho extends Actor { @@ -73,6 +81,14 @@ class DispatchersSpec extends AkkaSpec(DispatchersSpec.config) with ImplicitSend withFallback(defaultDispatcherConfig)))).toMap } + def assertMyDispatcherIsUsed(actor: ActorRef): Unit = { + actor ! "what's the name?" + val Expected = "(DispatchersSpec-myapp.mydispatcher-[1-9][0-9]*)".r + expectMsgPF(remaining) { + case Expected(x) ⇒ + } + } + "Dispatchers" must { "use defined properties" in { @@ -115,11 +131,7 @@ class DispatchersSpec extends AkkaSpec(DispatchersSpec.config) with ImplicitSend } "include system name and dispatcher id in thread names for fork-join-executor" in { - system.actorOf(Props[ThreadNameEcho].withDispatcher("myapp.mydispatcher")) ! "what's the name?" - val Expected = "(DispatchersSpec-myapp.mydispatcher-[1-9][0-9]*)".r - expectMsgPF(remaining) { - case Expected(x) ⇒ - } + assertMyDispatcherIsUsed(system.actorOf(Props[ThreadNameEcho].withDispatcher("myapp.mydispatcher"))) } "include system name and dispatcher id in thread names for thread-pool-executor" in { @@ -154,6 +166,15 @@ class DispatchersSpec extends AkkaSpec(DispatchersSpec.config) with ImplicitSend } } + "use dispatcher in deployment config" in { + assertMyDispatcherIsUsed(system.actorOf(Props[ThreadNameEcho], name = "echo1")) + } + + "use dispatcher in deployment config, trumps code" in { + assertMyDispatcherIsUsed( + system.actorOf(Props[ThreadNameEcho].withDispatcher("myapp.my-pinned-dispatcher"), name = "echo2")) + } + } } diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 5a044d941d..249cef2b63 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -93,6 +93,12 @@ akka { # deployment id pattern - on the format: /parent/child etc. default { + # The id of the dispatcher to use for this actor. + # If undefined or empty the dispatcher specified in code + # (Props.withDispatcher) is used, or default-dispatcher if not + # specified at all. + dispatcher = "" + # routing (load-balance) scheme to use # - available: "from-code", "round-robin", "random", "smallest-mailbox", # "scatter-gather", "broadcast" @@ -412,28 +418,28 @@ akka { } tcp { - + # The number of selectors to stripe the served channels over; each of # these will use one select loop on the selector-dispatcher. nr-of-selectors = 1 - + # Maximum number of open channels supported by this TCP module; there is - # no intrinsic general limit, this setting is meant to enable DoS + # no intrinsic general limit, this setting is meant to enable DoS # protection by limiting the number of concurrently connected clients. # Also note that this is a "soft" limit; in certain cases the implementation # will accept a few connections more or a few less than the number configured # here. Must be an integer > 0 or "unlimited". max-channels = 256000 - + # The select loop can be used in two modes: # - setting "infinite" will select without a timeout, hogging a thread - # - setting a positive timeout will do a bounded select call, + # - setting a positive timeout will do a bounded select call, # enabling sharing of a single thread between multiple selectors # (in this case you will have to use a different configuration for the # selector-dispatcher, e.g. using "type=Dispatcher" with size 1) # - setting it to zero means polling, i.e. calling selectNow() select-timeout = infinite - + # When trying to assign a new connection to a selector and the chosen # selector is at full capacity, retry selector choosing and assignment # this many times before giving up @@ -469,11 +475,11 @@ akka { # Fully qualified config path which holds the dispatcher configuration # to be used for running the select() calls in the selectors selector-dispatcher = "akka.io.pinned-dispatcher" - + # Fully qualified config path which holds the dispatcher configuration # for the read/write worker actors worker-dispatcher = "akka.actor.default-dispatcher" - + # Fully qualified config path which holds the dispatcher configuration # for the selector management actors management-dispatcher = "akka.actor.default-dispatcher" @@ -603,7 +609,7 @@ akka { # IMPORTANT NOTICE: - # + # # The following settings belong to the deprecated akka.actor.IO # implementation and will be removed once that is removed. They are not # taken into account by the akka.io.* implementation, which is configured diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index be1b6a7fa1..9c7e5a8fe3 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -589,11 +589,23 @@ private[akka] class LocalActorRefProvider private[akka] ( systemService: Boolean, deploy: Option[Deploy], lookupDeploy: Boolean, async: Boolean): InternalActorRef = { props.routerConfig match { case NoRouter ⇒ - if (settings.DebugRouterMisconfiguration && deployer.lookup(path).isDefined) - log.warning("Configuration says that {} should be a router, but code disagrees. Remove the config or add a routerConfig to its Props.") + if (settings.DebugRouterMisconfiguration) { + deployer.lookup(path) foreach { d ⇒ + if (d.routerConfig != NoRouter) + log.warning("Configuration says that [{}] should be a router, but code disagrees. Remove the config or add a routerConfig to its Props.", path) + } + } + + val props2 = + if (lookupDeploy) deployer.lookup(path) match { + case Some(d) if d.dispatcher != Deploy.NoDispatcherGiven ⇒ props.withDispatcher(d.dispatcher) + case _ ⇒ props // dispatcher not specified in deployment config + } + else props + + if (async) new RepointableActorRef(system, props2, supervisor, path).initialize(async) + else new LocalActorRef(system, props2, supervisor, path) - if (async) new RepointableActorRef(system, props, supervisor, path).initialize(async) - else new LocalActorRef(system, props, supervisor, path) case router ⇒ val lookup = if (lookupDeploy) deployer.lookup(path) else None val fromProps = Iterator(props.deploy.copy(routerConfig = props.deploy.routerConfig withFallback router)) diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index 310032e06d..a9a93c4a88 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -13,6 +13,10 @@ import akka.util.WildcardTree import java.util.concurrent.atomic.AtomicReference import scala.annotation.tailrec +object Deploy { + final val NoDispatcherGiven = "" +} + /** * This class represents deployment configuration for a given actor path. It is * marked final in order to guarantee stable merge semantics (i.e. what @@ -33,7 +37,8 @@ final case class Deploy( path: String = "", config: Config = ConfigFactory.empty, routerConfig: RouterConfig = NoRouter, - scope: Scope = NoScopeGiven) { + scope: Scope = NoScopeGiven, + dispatcher: String = Deploy.NoDispatcherGiven) { /** * Java API to create a Deploy with the given RouterConfig @@ -55,8 +60,10 @@ final case class Deploy( * precedence. The “path” of the other Deploy is not taken into account. All * other members are merged using ``.withFallback(other.)``. */ - def withFallback(other: Deploy): Deploy = - Deploy(path, config.withFallback(other.config), routerConfig.withFallback(other.routerConfig), scope.withFallback(other.scope)) + def withFallback(other: Deploy): Deploy = { + val disp = if (dispatcher == Deploy.NoDispatcherGiven) other.dispatcher else dispatcher + Deploy(path, config.withFallback(other.config), routerConfig.withFallback(other.routerConfig), scope.withFallback(other.scope), disp) + } } /** @@ -141,7 +148,8 @@ private[akka] class Deployer(val settings: ActorSystem.Settings, val dynamicAcce def parseConfig(key: String, config: Config): Option[Deploy] = { val deployment = config.withFallback(default) val router = createRouterConfig(deployment.getString("router"), key, config, deployment) - Some(Deploy(key, deployment, router, NoScopeGiven)) + val dispatcher = deployment.getString("dispatcher") + Some(Deploy(key, deployment, router, NoScopeGiven, dispatcher)) } /** diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 1f5c5cebe9..1100943ec2 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -218,7 +218,7 @@ akka { # when routees-path is defined. routees-path = "" - # Use members with specified role, or all members if undefined. + # Use members with specified role, or all members if undefined or empty. use-role = "" } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala index 3c40f1df6e..74c839ae80 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala @@ -22,6 +22,7 @@ object ClusterDeployerSpec { cluster.allow-local-routees = off } /user/service2 { + dispatcher = mydispatcher router = round-robin nr-of-instances = 20 cluster.enabled = on @@ -54,7 +55,8 @@ class ClusterDeployerSpec extends AkkaSpec(ClusterDeployerSpec.deployerConf) { deployment.get.config, ClusterRouterConfig(RoundRobinRouter(20), ClusterRouterSettings( totalInstances = 20, maxInstancesPerNode = 3, allowLocalRoutees = false, useRole = None)), - ClusterScope))) + ClusterScope, + Deploy.NoDispatcherGiven))) } "be able to parse 'akka.actor.deployment._' with specified cluster deploy routee settings" in { @@ -68,7 +70,8 @@ class ClusterDeployerSpec extends AkkaSpec(ClusterDeployerSpec.deployerConf) { deployment.get.config, ClusterRouterConfig(RoundRobinRouter(20), ClusterRouterSettings( totalInstances = 20, routeesPath = "/user/myservice", allowLocalRoutees = false, useRole = None)), - ClusterScope))) + ClusterScope, + "mydispatcher"))) } } diff --git a/akka-docs/rst/java/code/docs/dispatcher/DispatcherDocTestBase.java b/akka-docs/rst/java/code/docs/dispatcher/DispatcherDocTestBase.java index 6259d9a9aa..d24d2da445 100644 --- a/akka-docs/rst/java/code/docs/dispatcher/DispatcherDocTestBase.java +++ b/akka-docs/rst/java/code/docs/dispatcher/DispatcherDocTestBase.java @@ -30,8 +30,8 @@ import java.util.concurrent.ConcurrentLinkedQueue; //#imports-custom -import org.junit.After; -import org.junit.Before; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; import scala.Option; import scala.concurrent.ExecutionContext; @@ -43,27 +43,37 @@ import akka.testkit.AkkaSpec; public class DispatcherDocTestBase { - ActorSystem system; + static ActorSystem system; - @Before - public void setUp() { + @BeforeClass + public static void beforeAll() { system = ActorSystem.create("MySystem", ConfigFactory.parseString( DispatcherDocSpec.config()).withFallback(AkkaSpec.testConf())); } - @After - public void tearDown() { + @AfterClass + public static void afterAll() { system.shutdown(); + system = null; } @Test - public void defineDispatcher() { - //#defining-dispatcher + public void defineDispatcherInConfig() { + //#defining-dispatcher-in-config + ActorRef myActor = + system.actorOf(new Props(MyUntypedActor.class), + "myactor"); + //#defining-dispatcher-in-config + } + + @Test + public void defineDispatcherInCode() { + //#defining-dispatcher-in-code ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class).withDispatcher("my-dispatcher"), "myactor3"); - //#defining-dispatcher + //#defining-dispatcher-in-code } @Test diff --git a/akka-docs/rst/java/dispatchers.rst b/akka-docs/rst/java/dispatchers.rst index fb23bd6ef1..8d1888c5f4 100644 --- a/akka-docs/rst/java/dispatchers.rst +++ b/akka-docs/rst/java/dispatchers.rst @@ -25,16 +25,8 @@ Dispatchers implement the :class:`ExecutionContext` interface and can thus be us Setting the dispatcher for an Actor ----------------------------------- -So in case you want to give your ``Actor`` a different dispatcher than the default, you need to do two things, of which the first is: - -.. includecode:: ../java/code/docs/dispatcher/DispatcherDocTestBase.java#defining-dispatcher - -.. note:: - The "dispatcherId" you specify in withDispatcher is in fact a path into your configuration. - So in this example it's a top-level section, but you could for instance put it as a sub-section, - where you'd use periods to denote sub-sections, like this: ``"foo.bar.my-dispatcher"`` - -And then you just need to configure that dispatcher in your configuration: +So in case you want to give your ``Actor`` a different dispatcher than the default, you need to do two things, of which the first is +is to configure the dispatcher: .. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#my-dispatcher-config @@ -44,6 +36,24 @@ And here's another example that uses the "thread-pool-executor": For more options, see the default-dispatcher section of the :ref:`configuration`. +Then you create the actor as usual and define the dispatcher in the deployment configuration. + +.. includecode:: ../java/code/docs/dispatcher/DispatcherDocTestBase.java#defining-dispatcher-in-config + +.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#dispatcher-deployment-config + +An alternative to the deployment configuration is to define the dispatcher in code. +If you define the ``dispatcher`` in the deployment configuration then this value will be used instead +of programmatically provided parameter. + +.. includecode:: ../java/code/docs/dispatcher/DispatcherDocTestBase.java#defining-dispatcher-in-code + +.. note:: + The dispatcher you specify in ``withDispatcher`` and the ``dispatcher`` property in the deployment + configuration is in fact a path into your configuration. + So in this example it's a top-level section, but you could for instance put it as a sub-section, + where you'd use periods to denote sub-sections, like this: ``"foo.bar.my-dispatcher"`` + Types of dispatchers -------------------- diff --git a/akka-docs/rst/scala/code/docs/dispatcher/DispatcherDocSpec.scala b/akka-docs/rst/scala/code/docs/dispatcher/DispatcherDocSpec.scala index b4a92e261a..634e7b4f84 100644 --- a/akka-docs/rst/scala/code/docs/dispatcher/DispatcherDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/dispatcher/DispatcherDocSpec.scala @@ -103,6 +103,14 @@ object DispatcherDocSpec { //Other dispatcher configuration goes here } //#prio-dispatcher-config-java + + //#dispatcher-deployment-config + akka.actor.deployment { + /myactor { + dispatcher = my-dispatcher + } + } + //#dispatcher-deployment-config """ //#prio-mailbox @@ -165,13 +173,21 @@ class DispatcherDocSpec extends AkkaSpec(DispatcherDocSpec.config) { import DispatcherDocSpec.MyActor - "defining dispatcher" in { + "defining dispatcher in config" in { val context = system - //#defining-dispatcher + //#defining-dispatcher-in-config + import akka.actor.Props + val myActor = context.actorOf(Props[MyActor], "myactor") + //#defining-dispatcher-in-config + } + + "defining dispatcher in code" in { + val context = system + //#defining-dispatcher-in-code import akka.actor.Props val myActor = context.actorOf(Props[MyActor].withDispatcher("my-dispatcher"), "myactor1") - //#defining-dispatcher + //#defining-dispatcher-in-code } "defining dispatcher with bounded queue" in { diff --git a/akka-docs/rst/scala/dispatchers.rst b/akka-docs/rst/scala/dispatchers.rst index 593b67c97f..2dad196a0e 100644 --- a/akka-docs/rst/scala/dispatchers.rst +++ b/akka-docs/rst/scala/dispatchers.rst @@ -25,16 +25,8 @@ Dispatchers implement the :class:`ExecutionContext` interface and can thus be us Setting the dispatcher for an Actor ----------------------------------- -So in case you want to give your ``Actor`` a different dispatcher than the default, you need to do two things, of which the first is: - -.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#defining-dispatcher - -.. note:: - The "dispatcherId" you specify in withDispatcher is in fact a path into your configuration. - So in this example it's a top-level section, but you could for instance put it as a sub-section, - where you'd use periods to denote sub-sections, like this: ``"foo.bar.my-dispatcher"`` - -And then you just need to configure that dispatcher in your configuration: +So in case you want to give your ``Actor`` a different dispatcher than the default, you need to do two things, of which the first +is to configure the dispatcher: .. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#my-dispatcher-config @@ -44,6 +36,24 @@ And here's another example that uses the "thread-pool-executor": For more options, see the default-dispatcher section of the :ref:`configuration`. +Then you create the actor as usual and define the dispatcher in the deployment configuration. + +.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#defining-dispatcher-in-config + +.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#dispatcher-deployment-config + +An alternative to the deployment configuration is to define the dispatcher in code. +If you define the ``dispatcher`` in the deployment configuration then this value will be used instead +of programmatically provided parameter. + +.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#defining-dispatcher-in-code + +.. note:: + The dispatcher you specify in ``withDispatcher`` and the ``dispatcher`` property in the deployment + configuration is in fact a path into your configuration. + So in this example it's a top-level section, but you could for instance put it as a sub-section, + where you'd use periods to denote sub-sections, like this: ``"foo.bar.my-dispatcher"`` + Types of dispatchers -------------------- diff --git a/akka-remote/src/main/java/akka/remote/RemoteProtocol.java b/akka-remote/src/main/java/akka/remote/RemoteProtocol.java index 757c148e3a..db3de8f925 100644 --- a/akka-remote/src/main/java/akka/remote/RemoteProtocol.java +++ b/akka-remote/src/main/java/akka/remote/RemoteProtocol.java @@ -5805,6 +5805,10 @@ public final class RemoteProtocol { // optional bytes scope = 4; boolean hasScope(); com.google.protobuf.ByteString getScope(); + + // optional string dispatcher = 5; + boolean hasDispatcher(); + String getDispatcher(); } public static final class DeployProtocol extends com.google.protobuf.GeneratedMessage @@ -5897,11 +5901,44 @@ public final class RemoteProtocol { return scope_; } + // optional string dispatcher = 5; + public static final int DISPATCHER_FIELD_NUMBER = 5; + private java.lang.Object dispatcher_; + public boolean hasDispatcher() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + public String getDispatcher() { + java.lang.Object ref = dispatcher_; + if (ref instanceof String) { + return (String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + String s = bs.toStringUtf8(); + if (com.google.protobuf.Internal.isValidUtf8(bs)) { + dispatcher_ = s; + } + return s; + } + } + private com.google.protobuf.ByteString getDispatcherBytes() { + java.lang.Object ref = dispatcher_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8((String) ref); + dispatcher_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + private void initFields() { path_ = ""; config_ = com.google.protobuf.ByteString.EMPTY; routerConfig_ = com.google.protobuf.ByteString.EMPTY; scope_ = com.google.protobuf.ByteString.EMPTY; + dispatcher_ = ""; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -5931,6 +5968,9 @@ public final class RemoteProtocol { if (((bitField0_ & 0x00000008) == 0x00000008)) { output.writeBytes(4, scope_); } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + output.writeBytes(5, getDispatcherBytes()); + } getUnknownFields().writeTo(output); } @@ -5956,6 +5996,10 @@ public final class RemoteProtocol { size += com.google.protobuf.CodedOutputStream .computeBytesSize(4, scope_); } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(5, getDispatcherBytes()); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -6088,6 +6132,8 @@ public final class RemoteProtocol { bitField0_ = (bitField0_ & ~0x00000004); scope_ = com.google.protobuf.ByteString.EMPTY; bitField0_ = (bitField0_ & ~0x00000008); + dispatcher_ = ""; + bitField0_ = (bitField0_ & ~0x00000010); return this; } @@ -6142,6 +6188,10 @@ public final class RemoteProtocol { to_bitField0_ |= 0x00000008; } result.scope_ = scope_; + if (((from_bitField0_ & 0x00000010) == 0x00000010)) { + to_bitField0_ |= 0x00000010; + } + result.dispatcher_ = dispatcher_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -6170,6 +6220,9 @@ public final class RemoteProtocol { if (other.hasScope()) { setScope(other.getScope()); } + if (other.hasDispatcher()) { + setDispatcher(other.getDispatcher()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -6225,6 +6278,11 @@ public final class RemoteProtocol { scope_ = input.readBytes(); break; } + case 42: { + bitField0_ |= 0x00000010; + dispatcher_ = input.readBytes(); + break; + } } } } @@ -6339,6 +6397,42 @@ public final class RemoteProtocol { return this; } + // optional string dispatcher = 5; + private java.lang.Object dispatcher_ = ""; + public boolean hasDispatcher() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + public String getDispatcher() { + java.lang.Object ref = dispatcher_; + if (!(ref instanceof String)) { + String s = ((com.google.protobuf.ByteString) ref).toStringUtf8(); + dispatcher_ = s; + return s; + } else { + return (String) ref; + } + } + public Builder setDispatcher(String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000010; + dispatcher_ = value; + onChanged(); + return this; + } + public Builder clearDispatcher() { + bitField0_ = (bitField0_ & ~0x00000010); + dispatcher_ = getDefaultInstance().getDispatcher(); + onChanged(); + return this; + } + void setDispatcher(com.google.protobuf.ByteString value) { + bitField0_ |= 0x00000010; + dispatcher_ = value; + onChanged(); + } + // @@protoc_insertion_point(builder_scope:DeployProtocol) } @@ -6432,11 +6526,11 @@ public final class RemoteProtocol { "psProtocol\022\022\n\ndispatcher\030\001 \002(\t\022\037\n\006deploy" + "\030\002 \002(\0132\017.DeployProtocol\022\030\n\020fromClassCrea" + "tor\030\003 \001(\t\022\017\n\007creator\030\004 \001(\014\022\024\n\014routerConf" + - "ig\030\005 \001(\014\"S\n\016DeployProtocol\022\014\n\004path\030\001 \002(\t" + + "ig\030\005 \001(\014\"g\n\016DeployProtocol\022\014\n\004path\030\001 \002(\t" + "\022\016\n\006config\030\002 \001(\014\022\024\n\014routerConfig\030\003 \001(\014\022\r" + - "\n\005scope\030\004 \001(\014*7\n\013CommandType\022\013\n\007CONNECT\020" + - "\001\022\014\n\010SHUTDOWN\020\002\022\r\n\tHEARTBEAT\020\003B\017\n\013akka.r" + - "emoteH\001" + "\n\005scope\030\004 \001(\014\022\022\n\ndispatcher\030\005 \001(\t*7\n\013Com" + + "mandType\022\013\n\007CONNECT\020\001\022\014\n\010SHUTDOWN\020\002\022\r\n\tH" + + "EARTBEAT\020\003B\017\n\013akka.remoteH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -6520,7 +6614,7 @@ public final class RemoteProtocol { internal_static_DeployProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_DeployProtocol_descriptor, - new java.lang.String[] { "Path", "Config", "RouterConfig", "Scope", }, + new java.lang.String[] { "Path", "Config", "RouterConfig", "Scope", "Dispatcher", }, akka.remote.RemoteProtocol.DeployProtocol.class, akka.remote.RemoteProtocol.DeployProtocol.Builder.class); return null; diff --git a/akka-remote/src/main/protocol/RemoteProtocol.proto b/akka-remote/src/main/protocol/RemoteProtocol.proto index 39c19f5a2f..76c649dd00 100644 --- a/akka-remote/src/main/protocol/RemoteProtocol.proto +++ b/akka-remote/src/main/protocol/RemoteProtocol.proto @@ -110,4 +110,5 @@ message DeployProtocol { optional bytes config = 2; optional bytes routerConfig = 3; optional bytes scope = 4; + optional string dispatcher = 5; } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index bf778046d1..566b905ef4 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -233,7 +233,7 @@ private[akka] class RemoteActorRefProvider( } Iterator(props.deploy) ++ deployment.iterator reduce ((a, b) ⇒ b withFallback a) match { - case d @ Deploy(_, _, _, RemoteScope(addr)) ⇒ + case d @ Deploy(_, _, _, RemoteScope(addr), _) ⇒ if (hasAddress(addr)) { local.actorOf(system, props, supervisor, path, false, deployment.headOption, false, async) } else { diff --git a/akka-remote/src/main/scala/akka/remote/serialization/DaemonMsgCreateSerializer.scala b/akka-remote/src/main/scala/akka/remote/serialization/DaemonMsgCreateSerializer.scala index 3d098b0712..ad9e7128e9 100644 --- a/akka-remote/src/main/scala/akka/remote/serialization/DaemonMsgCreateSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/serialization/DaemonMsgCreateSerializer.scala @@ -28,6 +28,7 @@ import util.{ Failure, Success } private[akka] class DaemonMsgCreateSerializer(val system: ExtendedActorSystem) extends Serializer { import ProtobufSerializer.serializeActorRef import ProtobufSerializer.deserializeActorRef + import Deploy.NoDispatcherGiven def includeManifest: Boolean = false def identifier = 3 @@ -44,6 +45,8 @@ private[akka] class DaemonMsgCreateSerializer(val system: ExtendedActorSystem) e builder.setRouterConfig(serialize(d.routerConfig)) if (d.scope != NoScopeGiven) builder.setScope(serialize(d.scope)) + if (d.dispatcher != NoDispatcherGiven) + builder.setDispatcher(d.dispatcher) builder.build } @@ -85,7 +88,10 @@ private[akka] class DaemonMsgCreateSerializer(val system: ExtendedActorSystem) e val scope = if (protoDeploy.hasScope) deserialize(protoDeploy.getScope, classOf[Scope]) else NoScopeGiven - Deploy(protoDeploy.getPath, config, routerConfig, scope) + val dispatcher = + if (protoDeploy.hasDispatcher) protoDeploy.getDispatcher + else NoDispatcherGiven + Deploy(protoDeploy.getPath, config, routerConfig, scope, dispatcher) } def props = { diff --git a/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala index 609621dfc2..5f524029f0 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala @@ -16,6 +16,7 @@ object RemoteDeployerSpec { router = round-robin nr-of-instances = 3 remote = "akka://sys@wallace:2552" + dispatcher = mydispatcher } } akka.remote.netty.tcp.port = 0 @@ -35,14 +36,14 @@ class RemoteDeployerSpec extends AkkaSpec(RemoteDeployerSpec.deployerConf) { "be able to parse 'akka.actor.deployment._' with specified remote nodes" in { val service = "/user/service2" val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookup(service.split("/").drop(1)) - deployment must be('defined) deployment must be(Some( Deploy( service, deployment.get.config, RoundRobinRouter(3), - RemoteScope(Address("akka", "sys", "wallace", 2552))))) + RemoteScope(Address("akka", "sys", "wallace", 2552)), + "mydispatcher"))) } } diff --git a/akka-remote/src/test/scala/akka/remote/serialization/DaemonMsgCreateSerializerSpec.scala b/akka-remote/src/test/scala/akka/remote/serialization/DaemonMsgCreateSerializerSpec.scala index e516b7f53e..ea95aef975 100644 --- a/akka-remote/src/test/scala/akka/remote/serialization/DaemonMsgCreateSerializerSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/serialization/DaemonMsgCreateSerializerSpec.scala @@ -65,12 +65,14 @@ class DaemonMsgCreateSerializerSpec extends AkkaSpec { path = "path1", config = ConfigFactory.parseString("a=1"), routerConfig = RoundRobinRouter(nrOfInstances = 5, supervisorStrategy = supervisorStrategy), - scope = RemoteScope(Address("akka", "Test", "host1", 1921))) + scope = RemoteScope(Address("akka", "Test", "host1", 1921)), + dispatcher = "mydispatcher") val deploy2 = Deploy( path = "path2", config = ConfigFactory.parseString("a=2"), routerConfig = FromConfig, - scope = RemoteScope(Address("akka", "Test", "host2", 1922))) + scope = RemoteScope(Address("akka", "Test", "host2", 1922)), + dispatcher = Deploy.NoDispatcherGiven) DaemonMsgCreate( props = Props[MyActor].withDispatcher("my-disp").withDeploy(deploy1), deploy = deploy2,