From b8594d475ed7c6137ea0a0b8de5ff42f6402796f Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 7 May 2015 11:01:59 +0200 Subject: [PATCH] !cto #17454 Introduce DistributedPubSubSettings * rename DistributedPubSubExtension to DistributedPubSub --- .../src/main/resources/reference.conf | 2 +- .../akka/cluster/client/ClusterClient.scala | 4 +- .../pubsub/DistributedPubSubMediator.scala | 177 +++++++++++------- .../cluster/client/ClusterClientSpec.scala | 2 +- .../DistributedPubSubMediatorSpec.scala | 8 +- .../pubsub/DistributedPubSubMediatorTest.java | 12 +- .../DistributedPubSubMediatorRouterSpec.scala | 15 +- .../project/migration-guide-2.3.x-2.4.x.rst | 12 +- akka-docs/rst/scala/cluster-client.rst | 2 +- akka-docs/rst/scala/distributed-pub-sub.rst | 10 +- 10 files changed, 154 insertions(+), 90 deletions(-) diff --git a/akka-cluster-tools/src/main/resources/reference.conf b/akka-cluster-tools/src/main/resources/reference.conf index e02e9ffb2f..03eaec9f5c 100644 --- a/akka-cluster-tools/src/main/resources/reference.conf +++ b/akka-cluster-tools/src/main/resources/reference.conf @@ -6,7 +6,7 @@ # Make your edits/overrides in your application.conf. # //#pub-sub-ext-config -# Settings for the DistributedPubSubExtension +# Settings for the DistributedPubSub extension akka.cluster.pub-sub { # Actor name of the mediator actor, /user/distributedPubSubMediator name = distributedPubSubMediator diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala b/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala index ba28f6b117..d5217a21d7 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala @@ -196,7 +196,7 @@ class ClusterClient( /** * Extension that starts [[ClusterReceptionist]] and accompanying [[akka.cluster.pubsub.DistributedPubSubMediator]] * with settings defined in config section `akka.cluster.client.receptionist`. - * The [[akka.cluster.pubsub.DistributedPubSubMediator]] is started by the [[akka.cluster.pubsub.DistributedPubSubExtension]]. + * The [[akka.cluster.pubsub.DistributedPubSubMediator]] is started by the [[akka.cluster.pubsub.DistributedPubSubExtension]] extension. */ object ClusterReceptionistExtension extends ExtensionId[ClusterReceptionistExtension] with ExtensionIdProvider { override def get(system: ActorSystem): ClusterReceptionistExtension = super.get(system) @@ -224,7 +224,7 @@ class ClusterReceptionistExtension(system: ExtendedActorSystem) extends Extensio /** * Register the actors that should be reachable for the clients in this [[DistributedPubSubMediator]]. */ - private def pubSubMediator: ActorRef = DistributedPubSubExtension(system).mediator + private def pubSubMediator: ActorRef = DistributedPubSub(system).mediator /** * Register an actor that should be reachable for the clients. diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/DistributedPubSubMediator.scala b/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/DistributedPubSubMediator.scala index bf43737429..86790a14dc 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/DistributedPubSubMediator.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/DistributedPubSubMediator.scala @@ -37,36 +37,105 @@ import akka.routing.RoundRobinRoutingLogic import akka.routing.ConsistentHashingRoutingLogic import akka.routing.BroadcastRoutingLogic import scala.collection.immutable.TreeMap +import com.typesafe.config.Config +import akka.actor.NoSerializationVerificationNeeded +import akka.actor.Deploy + +object DistributedPubSubSettings { + /** + * Create settings from the default configuration + * `akka.cluster.pub-sub`. + */ + def apply(system: ActorSystem): DistributedPubSubSettings = + apply(system.settings.config.getConfig("akka.cluster.pub-sub")) + + /** + * Create settings from a configuration with the same layout as + * the default configuration `akka.cluster.pub-sub`. + */ + def apply(config: Config): DistributedPubSubSettings = + new DistributedPubSubSettings( + role = roleOption(config.getString("role")), + routingLogic = config.getString("routing-logic") match { + case "random" ⇒ RandomRoutingLogic() + case "round-robin" ⇒ RoundRobinRoutingLogic() + case "consistent-hashing" ⇒ throw new IllegalArgumentException(s"'consistent-hashing' routing logic can't be used by the pub-sub mediator") + case "broadcast" ⇒ BroadcastRoutingLogic() + case other ⇒ throw new IllegalArgumentException(s"Unknown 'routing-logic': [$other]") + }, + gossipInterval = config.getDuration("gossip-interval", MILLISECONDS).millis, + removedTimeToLive = config.getDuration("removed-time-to-live", MILLISECONDS).millis, + maxDeltaElements = config.getInt("max-delta-elements")) + + /** + * Java API: Create settings from the default configuration + * `akka.cluster.pub-sub`. + */ + def create(system: ActorSystem): DistributedPubSubSettings = apply(system) + + /** + * Java API: Create settings from a configuration with the same layout as + * the default configuration `akka.cluster.pub-sub`. + */ + def create(config: Config): DistributedPubSubSettings = apply(config) + + /** + * INTERNAL API + */ + private[akka] def roleOption(role: String): Option[String] = + if (role == "") None else Option(role) +} + +/** + * @param role Start the mediator on members tagged with this role. + * All members are used if undefined. + * @param routingLogic The routing logic to use for `Send`. + * @param gossipInterval How often the DistributedPubSubMediator should send out gossip information + * @param removedTimeToLive Removed entries are pruned after this duration + * @param maxDeltaElements Maximum number of elements to transfer in one message when synchronizing + * the registries. Next chunk will be transferred in next round of gossip. + */ +final class DistributedPubSubSettings( + val role: Option[String], + val routingLogic: RoutingLogic, + val gossipInterval: FiniteDuration, + val removedTimeToLive: FiniteDuration, + val maxDeltaElements: Int) extends NoSerializationVerificationNeeded { + + require(!routingLogic.isInstanceOf[ConsistentHashingRoutingLogic], + "'ConsistentHashingRoutingLogic' can't be used by the pub-sub mediator") + + def withRole(role: String): DistributedPubSubSettings = copy(role = DistributedPubSubSettings.roleOption(role)) + + def withRole(role: Option[String]): DistributedPubSubSettings = copy(role = role) + + def withRoutingLogic(routingLogic: RoutingLogic): DistributedPubSubSettings = + copy(routingLogic = routingLogic) + + def withGossipInterval(gossipInterval: FiniteDuration): DistributedPubSubSettings = + copy(gossipInterval = gossipInterval) + + def withRemovedTimeToLive(removedTimeToLive: FiniteDuration): DistributedPubSubSettings = + copy(removedTimeToLive = removedTimeToLive) + + def withMaxDeltaElements(maxDeltaElements: Int): DistributedPubSubSettings = + copy(maxDeltaElements = maxDeltaElements) + + private def copy(role: Option[String] = role, + routingLogic: RoutingLogic = routingLogic, + gossipInterval: FiniteDuration = gossipInterval, + removedTimeToLive: FiniteDuration = removedTimeToLive, + maxDeltaElements: Int = maxDeltaElements): DistributedPubSubSettings = + new DistributedPubSubSettings(role, routingLogic, gossipInterval, removedTimeToLive, maxDeltaElements) +} object DistributedPubSubMediator { /** * Scala API: Factory method for `DistributedPubSubMediator` [[akka.actor.Props]]. */ - def props( - role: Option[String], - routingLogic: RoutingLogic = RandomRoutingLogic(), - gossipInterval: FiniteDuration = 1.second, - removedTimeToLive: FiniteDuration = 2.minutes, - maxDeltaElements: Int = 3000): Props = - Props(classOf[DistributedPubSubMediator], role, routingLogic, gossipInterval, removedTimeToLive, maxDeltaElements) - - /** - * Java API: Factory method for `DistributedPubSubMediator` [[akka.actor.Props]]. - */ - def props( - role: String, - routingLogic: RoutingLogic, - gossipInterval: FiniteDuration, - removedTimeToLive: FiniteDuration, - maxDeltaElements: Int): Props = - props(Internal.roleOption(role), routingLogic, gossipInterval, removedTimeToLive, maxDeltaElements) - - /** - * Java API: Factory method for `DistributedPubSubMediator` [[akka.actor.Props]] - * with default values. - */ - def defaultProps(role: String): Props = props(Internal.roleOption(role)) + def props(settings: DistributedPubSubSettings): Props = + Props(new DistributedPubSubMediator(settings)).withDeploy(Deploy.local) @SerialVersionUID(1L) final case class Put(ref: ActorRef) @SerialVersionUID(1L) final case class Remove(path: String) @@ -172,11 +241,6 @@ object DistributedPubSubMediator { override def message = msg } - def roleOption(role: String): Option[String] = role match { - case null | "" ⇒ None - case _ ⇒ Some(role) - } - def encName(s: String) = URLEncoder.encode(s, "utf-8") trait TopicLike extends Actor { @@ -285,7 +349,7 @@ trait DistributedPubSubMessage extends Serializable * * The `DistributedPubSubMediator` is supposed to be started on all nodes, * or all nodes with specified role, in the cluster. The mediator can be - * started with the [[DistributedPubSubExtension]] or as an ordinary actor. + * started with the [[DistributedPubSub]] extension or as an ordinary actor. * * Changes are only performed in the own part of the registry and those changes * are versioned. Deltas are disseminated in a scalable way to other nodes with @@ -345,16 +409,11 @@ trait DistributedPubSubMessage extends Serializable * [[DistributedPubSubMediator.SubscribeAck]] and [[DistributedPubSubMediator.UnsubscribeAck]] * replies. */ -class DistributedPubSubMediator( - role: Option[String], - routingLogic: RoutingLogic, - gossipInterval: FiniteDuration, - removedTimeToLive: FiniteDuration, - maxDeltaElements: Int) - extends Actor with ActorLogging { +class DistributedPubSubMediator(settings: DistributedPubSubSettings) extends Actor with ActorLogging { import DistributedPubSubMediator._ import DistributedPubSubMediator.Internal._ + import settings._ require(!routingLogic.isInstanceOf[ConsistentHashingRoutingLogic], "'consistent-hashing' routing logic can't be used by the pub-sub mediator") @@ -648,32 +707,29 @@ class DistributedPubSubMediator( } } +object DistributedPubSub extends ExtensionId[DistributedPubSub] with ExtensionIdProvider { + override def get(system: ActorSystem): DistributedPubSub = super.get(system) + + override def lookup = DistributedPubSub + + override def createExtension(system: ExtendedActorSystem): DistributedPubSub = + new DistributedPubSub(system) +} + /** * Extension that starts a [[DistributedPubSubMediator]] actor * with settings defined in config section `akka.cluster.pub-sub`. */ -object DistributedPubSubExtension extends ExtensionId[DistributedPubSubExtension] with ExtensionIdProvider { - override def get(system: ActorSystem): DistributedPubSubExtension = super.get(system) +class DistributedPubSub(system: ExtendedActorSystem) extends Extension { - override def lookup = DistributedPubSubExtension - - override def createExtension(system: ExtendedActorSystem): DistributedPubSubExtension = - new DistributedPubSubExtension(system) -} - -class DistributedPubSubExtension(system: ExtendedActorSystem) extends Extension { - - private val config = system.settings.config.getConfig("akka.cluster.pub-sub") - private val role: Option[String] = config.getString("role") match { - case "" ⇒ None - case r ⇒ Some(r) - } + private val settings = DistributedPubSubSettings(system) /** * Returns true if this member is not tagged with the role configured for the * mediator. */ - def isTerminated: Boolean = Cluster(system).isTerminated || !role.forall(Cluster(system).selfRoles.contains) + def isTerminated: Boolean = + Cluster(system).isTerminated || !settings.role.forall(Cluster(system).selfRoles.contains) /** * The [[DistributedPubSubMediator]] @@ -682,19 +738,8 @@ class DistributedPubSubExtension(system: ExtendedActorSystem) extends Extension if (isTerminated) system.deadLetters else { - val routingLogic = config.getString("routing-logic") match { - case "random" ⇒ RandomRoutingLogic() - case "round-robin" ⇒ RoundRobinRoutingLogic() - case "consistent-hashing" ⇒ throw new IllegalArgumentException(s"'consistent-hashing' routing logic can't be used by the pub-sub mediator") - case "broadcast" ⇒ BroadcastRoutingLogic() - case other ⇒ throw new IllegalArgumentException(s"Unknown 'routing-logic': [$other]") - } - val gossipInterval = config.getDuration("gossip-interval", MILLISECONDS).millis - val removedTimeToLive = config.getDuration("removed-time-to-live", MILLISECONDS).millis - val maxDeltaElements = config.getInt("max-delta-elements") - val name = config.getString("name") - system.actorOf(DistributedPubSubMediator.props(role, routingLogic, gossipInterval, removedTimeToLive, maxDeltaElements), - name) + val name = system.settings.config.getString("akka.cluster.pub-sub.name") + system.actorOf(DistributedPubSubMediator.props(settings), name) } } } diff --git a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala index 6f8de09165..1f48d3b42e 100644 --- a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala +++ b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala @@ -72,7 +72,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod def awaitCount(expected: Int): Unit = { awaitAssert { - DistributedPubSubExtension(system).mediator ! DistributedPubSubMediator.Count + DistributedPubSub(system).mediator ! DistributedPubSubMediator.Count expectMsgType[Int] should ===(expected) } } diff --git a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/pubsub/DistributedPubSubMediatorSpec.scala b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/pubsub/DistributedPubSubMediatorSpec.scala index deea3f8225..af0cd9b3cc 100644 --- a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/pubsub/DistributedPubSubMediatorSpec.scala +++ b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/pubsub/DistributedPubSubMediatorSpec.scala @@ -64,7 +64,7 @@ object DistributedPubSubMediatorSpec extends MultiNodeConfig { class Publisher extends Actor { import DistributedPubSubMediator.Publish // activate the extension - val mediator = DistributedPubSubExtension(context.system).mediator + val mediator = DistributedPubSub(context.system).mediator def receive = { case in: String ⇒ @@ -77,7 +77,7 @@ object DistributedPubSubMediatorSpec extends MultiNodeConfig { //#subscriber class Subscriber extends Actor with ActorLogging { import DistributedPubSubMediator.{ Subscribe, SubscribeAck } - val mediator = DistributedPubSubExtension(context.system).mediator + val mediator = DistributedPubSub(context.system).mediator // subscribe to the topic named "content" mediator ! Subscribe("content", self) @@ -114,8 +114,8 @@ class DistributedPubSubMediatorSpec extends MultiNodeSpec(DistributedPubSubMedia enterBarrier(from.name + "-joined") } - def createMediator(): ActorRef = DistributedPubSubExtension(system).mediator - def mediator: ActorRef = DistributedPubSubExtension(system).mediator + def createMediator(): ActorRef = DistributedPubSub(system).mediator + def mediator: ActorRef = DistributedPubSub(system).mediator var chatUsers: Map[String, ActorRef] = Map.empty diff --git a/akka-cluster-tools/src/test/java/akka/cluster/pubsub/DistributedPubSubMediatorTest.java b/akka-cluster-tools/src/test/java/akka/cluster/pubsub/DistributedPubSubMediatorTest.java index dc3ce68327..1fae155ed2 100644 --- a/akka-cluster-tools/src/test/java/akka/cluster/pubsub/DistributedPubSubMediatorTest.java +++ b/akka-cluster-tools/src/test/java/akka/cluster/pubsub/DistributedPubSubMediatorTest.java @@ -4,7 +4,10 @@ package akka.cluster.pubsub; +import com.typesafe.config.ConfigFactory; + import akka.testkit.AkkaJUnitActorSystemResource; + import org.junit.ClassRule; import org.junit.Test; @@ -19,7 +22,10 @@ public class DistributedPubSubMediatorTest { @ClassRule public static AkkaJUnitActorSystemResource actorSystemResource = - new AkkaJUnitActorSystemResource("DistributedPubSubMediatorTest"); + new AkkaJUnitActorSystemResource("DistributedPubSubMediatorTest", + ConfigFactory.parseString( + "akka.actor.provider = \"akka.cluster.ClusterActorRefProvider\"\n" + + "akka.remote.netty.tcp.port=0")); private final ActorSystem system = actorSystemResource.getSystem(); @@ -47,7 +53,7 @@ public class DistributedPubSubMediatorTest { public Subscriber() { ActorRef mediator = - DistributedPubSubExtension.get(getContext().system()).mediator(); + DistributedPubSub.get(getContext().system()).mediator(); // subscribe to the topic named "content" mediator.tell(new DistributedPubSubMediator.Subscribe("content", getSelf()), getSelf()); @@ -70,7 +76,7 @@ public class DistributedPubSubMediatorTest { // activate the extension ActorRef mediator = - DistributedPubSubExtension.get(getContext().system()).mediator(); + DistributedPubSub.get(getContext().system()).mediator(); public void onReceive(Object msg) { if (msg instanceof String) { diff --git a/akka-cluster-tools/src/test/scala/akka/cluster/pubsub/DistributedPubSubMediatorRouterSpec.scala b/akka-cluster-tools/src/test/scala/akka/cluster/pubsub/DistributedPubSubMediatorRouterSpec.scala index b23222cefb..fd59a65cb6 100644 --- a/akka-cluster-tools/src/test/scala/akka/cluster/pubsub/DistributedPubSubMediatorRouterSpec.scala +++ b/akka-cluster-tools/src/test/scala/akka/cluster/pubsub/DistributedPubSubMediatorRouterSpec.scala @@ -4,6 +4,7 @@ import akka.testkit._ import akka.routing.{ ConsistentHashingRoutingLogic, RouterEnvelope } import org.scalatest.WordSpecLike import akka.actor.{ ActorInitializationException, ActorRef } +import com.typesafe.config.ConfigFactory case class WrappedMessage(msg: String) extends RouterEnvelope { override def message = msg @@ -15,6 +16,7 @@ object DistributedPubSubMediatorRouterSpec { def config(routingLogic: String) = s""" akka.loglevel = INFO akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.remote.netty.tcp.port=0 akka.remote.log-remote-lifecycle-events = off akka.cluster.pub-sub.routing-logic = $routingLogic """ @@ -85,7 +87,7 @@ class DistributedPubSubMediatorWithRandomRouterSpec extends AkkaSpec(DistributedPubSubMediatorRouterSpec.config("random")) with DistributedPubSubMediatorRouterSpec with DefaultTimeout with ImplicitSender { - val mediator = DistributedPubSubExtension(system).mediator + val mediator = DistributedPubSub(system).mediator "DistributedPubSubMediator when sending wrapped message" must { val msg = WrappedMessage("hello") @@ -106,13 +108,14 @@ class DistributedPubSubMediatorWithHashRouterSpec "not be allowed" when { "constructed by extension" in { intercept[IllegalArgumentException] { - DistributedPubSubExtension(system).mediator + DistributedPubSub(system).mediator } } - "constructed by props" in { - EventFilter[ActorInitializationException](occurrences = 1) intercept { - system.actorOf( - DistributedPubSubMediator.props(None, routingLogic = ConsistentHashingRoutingLogic(system))) + "constructed by settings" in { + intercept[IllegalArgumentException] { + val config = ConfigFactory.parseString(DistributedPubSubMediatorRouterSpec.config("random")) + .withFallback(system.settings.config).getConfig("akka.cluster.pub-sub") + DistributedPubSubSettings(config).withRoutingLogic(ConsistentHashingRoutingLogic(system)) } } } diff --git a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst index e9690f3469..5a34084f2b 100644 --- a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst @@ -221,7 +221,17 @@ The configuration properties changed name to ``akka.cluster.sharding``. ClusterSingletonManager and ClusterSingletonProxy construction ============================================================== -Parameters to the ``Props`` factory methods have been moved to settings object ``ClusterSingletonManagerSettings` +Parameters to the ``Props`` factory methods have been moved to settings object ``ClusterSingletonManagerSettings`` and ``ClusterSingletonProxySettings``. These can be created from system configuration properties and also amended with API as needed. +DistributedPubSub construction +============================== + +Normally, the ``DistributedPubSubMediator`` is started by the ``DistributedPubSubExtension``. +This extension has been renamed to ``DistributedPubSub``. It is also possible to start +it as an ordinary actor if you need multiple instances of it with different settings. +The parameters of the ``Props`` factory methods in the ``DistributedPubSubMediator`` companion +has been moved to settings object ``DistributedPubSubSettings``. This can be created from +system configuration properties and also amended with API as needed. + \ No newline at end of file diff --git a/akka-docs/rst/scala/cluster-client.rst b/akka-docs/rst/scala/cluster-client.rst index a625a85c2e..fb0db28be8 100644 --- a/akka-docs/rst/scala/cluster-client.rst +++ b/akka-docs/rst/scala/cluster-client.rst @@ -101,7 +101,7 @@ The ``ClusterReceptionistExtension`` can be configured with the following proper .. includecode:: ../../../akka-cluster-tools/src/main/resources/reference.conf#receptionist-ext-config -Note that the ``ClusterReceptionistExtension`` uses the ``DistributedPubSubExtension``, which is described +Note that the ``ClusterReceptionistExtension`` uses the ``DistributedPubSub`` extension, which is described in :ref:`distributed-pub-sub`. It is recommended to load the extension when the actor system is started by defining it in the diff --git a/akka-docs/rst/scala/distributed-pub-sub.rst b/akka-docs/rst/scala/distributed-pub-sub.rst index 64d74c91eb..cdb4c2d9cd 100644 --- a/akka-docs/rst/scala/distributed-pub-sub.rst +++ b/akka-docs/rst/scala/distributed-pub-sub.rst @@ -14,7 +14,7 @@ actors among all cluster nodes or a group of nodes tagged with a specific role. The `DistributedPubSubMediator` is supposed to be started on all nodes, or all nodes with specified role, in the cluster. The mediator can be -started with the ``DistributedPubSubExtension`` or as an ordinary actor. +started with the ``DistributedPubSub`` or as an ordinary actor. Changes are only performed in the own part of the registry and those changes are versioned. Deltas are disseminated in a scalable way to other nodes with @@ -122,16 +122,16 @@ It can publish messages to the topic from anywhere in the cluster: A more comprehensive sample is available in the `Typesafe Activator `_ tutorial named `Akka Clustered PubSub with Scala! `_. -DistributedPubSubExtension +DistributedPubSub -------------------------- -In the example above the mediator is started and accessed with the ``akka.cluster.pubsub.DistributedPubSubExtension``. +In the example above the mediator is started and accessed with the ``akka.cluster.pubsub.DistributedPubSub``. That is convenient and perfectly fine in most cases, but it can be good to know that it is possible to start the mediator actor as an ordinary actor and you can have several different mediators at the same time to be able to divide a large number of actors/topics to different mediators. For example you might want to use different cluster roles for different mediators. -The ``DistributedPubSubExtension`` can be configured with the following properties: +The ``DistributedPubSub`` can be configured with the following properties: .. includecode:: ../../../akka-cluster-tools/src/main/resources/reference.conf#pub-sub-ext-config @@ -141,7 +141,7 @@ and then it takes a while for it to be populated. :: - akka.extensions = ["akka.cluster.pubsub.DistributedPubSubExtension"] + akka.extensions = ["akka.cluster.pubsub.DistributedPubSub"] Dependencies ------------