diff --git a/akka-contrib/docs/cluster-singleton.rst b/akka-contrib/docs/cluster-singleton.rst index d80e8eb013..d28a548b0a 100644 --- a/akka-contrib/docs/cluster-singleton.rst +++ b/akka-contrib/docs/cluster-singleton.rst @@ -88,6 +88,10 @@ the group of members tagged with a specific role. Note that the hand-over might still be in progress and the singleton actor might not be started yet when you receive the ``LeaderChanged`` / ``RoleLeaderChanged`` event. +A nice alternative to the above proxy is to use :ref:`distributed-pub-sub`. Let the singleton +actor register itself to the mediator with ``DistributedPubSubMediator.Put`` message when it is +started. Send messages to the singleton actor via the mediator with ``DistributedPubSubMediator.SendToAll``. + To test scenarios where the cluster leader node is removed or shut down you can use :ref:`multi-node-testing` and utilize the fact that the leader is supposed to be the first member when sorted by member address. diff --git a/akka-contrib/docs/distributed-pub-sub.rst b/akka-contrib/docs/distributed-pub-sub.rst new file mode 100644 index 0000000000..01aebd0aed --- /dev/null +++ b/akka-contrib/docs/distributed-pub-sub.rst @@ -0,0 +1,129 @@ +.. _distributed-pub-sub: + +Distributed Publish Subscribe in Cluster +======================================== + +How do I send a message to an actor without knowing which node it is running on? + +How do I send messages to all actors in the cluster that have registered interest +in a named topic? + +This pattern provides a mediator actor, ``akka.contrib.pattern.DistributedPubSubMediator``, +that manages a registry of actor references and replicates the entries to peer +actors among all cluster nodes or a group of nodes tagged with a specific role. + +The `DistributedPubSubMediator` is supposed to be started on all nodes, +or all nodes with specified role, in the cluster. The mediator can be +started with the ``DistributedPubSubExtension`` or as an ordinary actor. + +Changes are only performed in the own part of the registry and those changes +are versioned. Deltas are disseminated in a scalable way to other nodes with +a gossip protocol. The registry is eventually consistent, i.e. changes are not +immediately visible at other nodes, but typically they will be fully replicated +to all other nodes after a few seconds. + +You can send messages via the mediator on any node to registered actors on +any other node. There is three modes of message delivery. + +**1. DistributedPubSubMediator.Send** + +The message will be delivered to one recipient with a matching path, if any such +exists in the registry. If several entries match the path the message will be delivered +to one random destination. The sender of the message can specify that local +affinity is preferred, i.e. the message is sent to an actor in the same local actor +system as the used mediator actor, if any such exists, otherwise random to any other +matching entry. A typical usage of this mode is private chat to one other user in +an instant messaging application. It can also be used for distributing tasks to workers, +like a random router. + +**2. DistributedPubSubMediator.SendToAll** + +The message will be delivered to all recipients with a matching path. Actors with +the same path, without address information, can be registered on different nodes. +On each node there can only be one such actor, since the path is unique within one +local actor system. Typical usage of this mode is to broadcast messages to all replicas +with the same path, e.g. 3 actors on different nodes that all perform the same actions, +for redundancy. + +**3. DistributedPubSubMediator.Publish** + +Actors may be registered to a named topic instead of path. This enables many subscribers +on each node. The message will be delivered to all subscribers of the topic. For +efficiency the message is sent over the wire only once per node (that has a matching topic), +and then delivered to all subscribers of the local topic representation. This is the +true pub/sub mode. A typical usage of this mode is a chat room in an instant messaging +application. + +You register actors to the local mediator with ``DistributedPubSubMediator.Put`` or +``DistributedPubSubMediator.Subscribe``. ``Put`` is used together with ``Send`` and +``SendToAll`` message delivery modes. The ``ActorRef`` in ``Put`` must belong to the same +local actor system as the mediator. ``Subscribe`` is used together with ``Publish``. +Actors are automatically removed from the registry when they are terminated, or you +can explicitly remove entries with ``DistributedPubSubMediator.Remove`` or +``DistributedPubSubMediator.Unsubscribe``. + +Successful ``Subscribe`` and ``Unsubscribe`` is acknowledged with +``DistributedPubSubMediator.SubscribeAck`` and ``DistributedPubSubMediator.UnsubscribeAck`` +replies. + +A Small Example in Java +----------------------- + +A subscriber actor: + +.. includecode:: @contribSrc@/src/test/java/akka/contrib/pattern/DistributedPubSubMediatorTest.java#subscriber + +Subscriber actors can be started on several nodes in the cluster, and all will receive +messages published to the "content" topic. + +.. includecode:: @contribSrc@/src/test/java/akka/contrib/pattern/DistributedPubSubMediatorTest.java#start-subscribers + +A simple actor that publishes to this "content" topic: + +.. includecode:: @contribSrc@/src/test/java/akka/contrib/pattern/DistributedPubSubMediatorTest.java#publisher + +It can publish messages to the topic from anywhere in the cluster: + +.. includecode:: @contribSrc@/src/test/java/akka/contrib/pattern/DistributedPubSubMediatorTest.java#publish-message + +A Small Example in Scala +------------------------ + +A subscriber actor: + +.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/DistributedPubSubMediatorSpec.scala#subscriber + +Subscriber actors can be started on several nodes in the cluster, and all will receive +messages published to the "content" topic. + +.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/DistributedPubSubMediatorSpec.scala#start-subscribers + +A simple actor that publishes to this "content" topic: + +.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/DistributedPubSubMediatorSpec.scala#publisher + +It can publish messages to the topic from anywhere in the cluster: + +.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/DistributedPubSubMediatorSpec.scala#publish-message + +DistributedPubSubExtension +-------------------------- + +In the example above the mediator is started and accessed with the ``akka.contrib.pattern.DistributedPubSubExtension``. +That is convenient and perfectly fine in most cases, but it can be good to know that it is possible to +start the mediator actor as an ordinary actor and you can have several different mediators at the same +time to be able to divide a large number of actors/topics to different mediators. For example you might +want to use different cluster roles for different mediators. + +The ``DistributedPubSubExtension`` can be configured with the following properties: + +.. includecode:: @contribSrc@/src/main/resources/reference.conf#pub-sub-ext-config + +It is recommended to load the extension when the actor system is started by defining it in +``akka.extensions`` configuration property. Otherwise it will be activated when first used +and then it takes a while for it to be populated. + +:: + + akka.extensions = ["akka.contrib.pattern.DistributedPubSubExtension"] + diff --git a/akka-contrib/docs/index.rst b/akka-contrib/docs/index.rst index 5dd2117905..ba2e911041 100644 --- a/akka-contrib/docs/index.rst +++ b/akka-contrib/docs/index.rst @@ -35,6 +35,7 @@ The Current List of Modules jul peek-mailbox cluster-singleton + distributed-pub-sub Suggested Way of Using these Contributions ------------------------------------------ diff --git a/akka-contrib/src/main/resources/reference.conf b/akka-contrib/src/main/resources/reference.conf new file mode 100644 index 0000000000..5f02965181 --- /dev/null +++ b/akka-contrib/src/main/resources/reference.conf @@ -0,0 +1,24 @@ +###################################### +# Akka Contrib Reference Config File # +###################################### + +# This is the reference config file that contains all the default settings. +# Make your edits/overrides in your application.conf. + +# //#pub-sub-ext-config +# Settings for the DistributedPubSubExtension +akka.contrib.cluster.pub-sub { + # Actor name of the mediator actor, /user/distributedPubSubMediator + name = distributedPubSubMediator + + # Start the mediator on members tagged with this role. + # All members are used if undefined or empty. + role = "" + + # How often the DistributedPubSubMediator should send out gossip information + gossip-interval = 1s + + # Removed entries are pruned after this duration + removed-time-to-live = 120s +} +# //#pub-sub-ext-config diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala index 9d8f5cebc4..8dbdb0be0e 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala @@ -337,10 +337,8 @@ class ClusterSingletonManager( val cluster = Cluster(context.system) val selfAddressOption = Some(cluster.selfAddress) - role match { - case None ⇒ - case Some(r) ⇒ require(cluster.selfRoles.contains(r), s"This cluster member [${cluster.selfAddress}] doesn't have the role [$role]") - } + require(role.forall(cluster.selfRoles.contains), + s"This cluster member [${cluster.selfAddress}] doesn't have the role [$role]") // started when when self member is Up var leaderChangedBuffer: ActorRef = _ diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala new file mode 100644 index 0000000000..e8f9c16b4a --- /dev/null +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala @@ -0,0 +1,455 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.contrib.pattern + +import scala.collection.immutable +import scala.concurrent.duration._ +import scala.concurrent.forkjoin.ThreadLocalRandom +import java.net.URLEncoder + +import akka.actor.Actor +import akka.actor.ActorLogging +import akka.actor.ActorPath +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.actor.Address +import akka.actor.ExtendedActorSystem +import akka.actor.Extension +import akka.actor.ExtensionId +import akka.actor.ExtensionIdProvider +import akka.actor.Props +import akka.actor.Terminated +import akka.cluster.Cluster +import akka.cluster.ClusterEvent._ +import akka.cluster.Member +import akka.cluster.MemberStatus + +object DistributedPubSubMediator { + @SerialVersionUID(1L) + case class Put(ref: ActorRef) + @SerialVersionUID(1L) + case class Remove(path: String) + @SerialVersionUID(1L) + case class Subscribe(topic: String, ref: ActorRef) + @SerialVersionUID(1L) + case class Unsubscribe(topic: String, ref: ActorRef) + @SerialVersionUID(1L) + case class SubscribeAck(subscribe: Subscribe) + @SerialVersionUID(1L) + case class UnsubscribeAck(unsubscribe: Unsubscribe) + @SerialVersionUID(1L) + case class Send(path: String, msg: Any, localAffinity: Boolean) + @SerialVersionUID(1L) + case class SendToAll(path: String, msg: Any) + @SerialVersionUID(1L) + case class Publish(topic: String, msg: Any) + + // Only for testing purposes, to poll/await replication + case object Count + + /** + * INTERNAL API + */ + private[pattern] object Internal { + case object Prune + + @SerialVersionUID(1L) + case class Bucket( + owner: Address, + version: Long, + content: Map[String, ValueHolder]) + + @SerialVersionUID(1L) + case class ValueHolder(version: Long, ref: Option[ActorRef]) + + @SerialVersionUID(1L) + case class Status(versions: Map[Address, Long]) + @SerialVersionUID(1L) + case class Delta(buckets: immutable.Iterable[Bucket]) + + case object GossipTick + + def roleOption(role: String): Option[String] = role match { + case null | "" ⇒ None + case _ ⇒ Some(role) + } + + class Topic(emptyTimeToLive: FiniteDuration) extends Actor { + import context.dispatcher + val pruneInterval: FiniteDuration = emptyTimeToLive / 2 + val pruneTask = context.system.scheduler.schedule(pruneInterval, pruneInterval, self, Prune) + var pruneDeadline: Option[Deadline] = None + + var subscribers = Set.empty[ActorRef] + + override def postStop(): Unit = { + super.postStop() + pruneTask.cancel() + } + + def receive = { + case msg @ Subscribe(_, ref) ⇒ + context watch ref + subscribers += ref + pruneDeadline = None + sender.tell(SubscribeAck(msg), context.parent) + case msg @ Unsubscribe(_, ref) ⇒ + context unwatch ref + remove(ref) + sender.tell(UnsubscribeAck(msg), context.parent) + case Terminated(ref) ⇒ + remove(ref) + case Prune ⇒ + for (d ← pruneDeadline if d.isOverdue) context stop self + case msg ⇒ + subscribers foreach { _ forward msg } + } + + def remove(ref: ActorRef): Unit = + if (subscribers.contains(ref)) { + subscribers -= ref + if (subscribers.isEmpty) + pruneDeadline = Some(Deadline.now + emptyTimeToLive) + } + + } + } +} + +/** + * This actor manages a registry of actor references and replicates + * the entries to peer actors among all cluster nodes or a group of nodes + * tagged with a specific role. + * + * The `DistributedPubSubMediator` is supposed to be started on all nodes, + * or all nodes with specified role, in the cluster. The mediator can be + * started with the [[DistributedPubSubExtension]] or as an ordinary actor. + * + * Changes are only performed in the own part of the registry and those changes + * are versioned. Deltas are disseminated in a scalable way to other nodes with + * a gossip protocol. The registry is eventually consistent, i.e. changes are not + * immediately visible at other nodes, but typically they will be fully replicated + * to all other nodes after a few seconds. + * + * You can send messages via the mediator on any node to registered actors on + * any other node. There is three modes of message delivery. + * + * 1. [[DistributedPubSubMediator.Send]] - + * The message will be delivered to one recipient with a matching path, if any such + * exists in the registry. If several entries match the path the message will be delivered + * to one random destination. The sender of the message can specify that local + * affinity is preferred, i.e. the message is sent to an actor in the same local actor + * system as the used mediator actor, if any such exists, otherwise random to any other + * matching entry. A typical usage of this mode is private chat to one other user in + * an instant messaging application. It can also be used for distributing tasks to workers, + * like a random router. + * + * 2. [[DistributedPubSubMediator.SendToAll]] - + * The message will be delivered to all recipients with a matching path. Actors with + * the same path, without address information, can be registered on different nodes. + * On each node there can only be one such actor, since the path is unique within one + * local actor system. Typical usage of this mode is to broadcast messages to all replicas + * with the same path, e.g. 3 actors on different nodes that all perform the same actions, + * for redundancy. + * + * 3. [[DistributedPubSubMediator.Publish]] - + * Actors may be registered to a named topic instead of path. This enables many subscribers + * on each node. The message will be delivered to all subscribers of the topic. For + * efficiency the message is sent over the wire only once per node (that has a matching topic), + * and then delivered to all subscribers of the local topic representation. This is the + * true pub/sub mode. A typical usage of this mode is a chat room in an instant messaging + * application. + * + * You register actors to the local mediator with [[DistributedPubSubMediator.Put]] or + * [[DistributedPubSubMediator.Subscribe]]. `Put` is used together with `Send` and + * `SendToAll` message delivery modes. The `ActorRef` in `Put` must belong to the same + * local actor system as the mediator. `Subscribe` is used together with `Publish`. + * Actors are automatically removed from the registry when they are terminated, or you + * can explicitly remove entries with [[DistributedPubSubMediator.Remove]] or + * [[DistributedPubSubMediator.Unsubscribe]]. + * + * Successful `Subscribe` and `Unsubscribe` is acknowledged with + * [[DistributedPubSubMediator.SubscribeAck]] and [[DistributedPubSubMediator.UnsubscribeAck]] + * replies. + */ +class DistributedPubSubMediator( + role: Option[String], + gossipInterval: FiniteDuration = 1.second, + removedTimeToLive: FiniteDuration = 2.minutes) + extends Actor with ActorLogging { + + /** + * Java API constructor with default values. + */ + def this(role: String) = this(DistributedPubSubMediator.Internal.roleOption(role)) + + import DistributedPubSubMediator._ + import DistributedPubSubMediator.Internal._ + + val cluster = Cluster(context.system) + import cluster.selfAddress + + require(role.forall(cluster.selfRoles.contains), + s"This cluster member [${selfAddress}] doesn't have the role [$role]") + + val removedTimeToLiveMillis = removedTimeToLive.toMillis + + //Start periodic gossip to random nodes in cluster + import context.dispatcher + val gossipTask = context.system.scheduler.schedule(gossipInterval, gossipInterval, self, GossipTick) + val pruneInterval: FiniteDuration = removedTimeToLive / 2 + val pruneTask = context.system.scheduler.schedule(pruneInterval, pruneInterval, self, Prune) + + var registry: Map[Address, Bucket] = Map.empty.withDefault(a ⇒ Bucket(a, 0L, Map.empty)) + var nodes: Set[Address] = Set.empty + + // the version is a timestamp because it is also used when pruning removed entries + val nextVersion = { + var version = 0L + () ⇒ { + val current = System.currentTimeMillis + version = if (current > version) current else version + 1 + version + } + } + + override def preStart(): Unit = { + super.preStart() + require(!cluster.isTerminated, "Cluster node must not be terminated") + cluster.subscribe(self, classOf[MemberEvent]) + } + + override def postStop(): Unit = { + super.postStop() + cluster unsubscribe self + gossipTask.cancel() + pruneTask.cancel() + } + + def matchingRole(m: Member): Boolean = role.forall(m.hasRole) + + def receive = { + + case Send(path, msg, localAffinity) ⇒ + registry(selfAddress).content.get(path) match { + case Some(ValueHolder(_, Some(ref))) if localAffinity ⇒ + ref forward msg + case _ ⇒ + val refs = (for { + (_, bucket) ← registry + valueHolder ← bucket.content.get(path) + ref ← valueHolder.ref + } yield ref).toVector + + if (refs.nonEmpty) + refs(ThreadLocalRandom.current.nextInt(refs.size)) forward msg + } + + case SendToAll(path, msg) ⇒ + publish(path, msg) + + case Publish(topic, msg) ⇒ + publish(mkKey(self.path / URLEncoder.encode(topic, "utf-8")), msg) + + case Put(ref: ActorRef) ⇒ + if (ref.path.address.hasGlobalScope) + log.warning("Registered actor must be local: [{}]", ref) + else { + put(mkKey(ref), Some(ref)) + context.watch(ref) + } + + case Remove(key) ⇒ + registry(selfAddress).content.get(key) match { + case Some(ValueHolder(_, Some(ref))) ⇒ + context.unwatch(ref) + put(key, None) + case _ ⇒ + } + + case msg @ Subscribe(topic, _) ⇒ + // each topic is managed by a child actor with the same name as the topic + val encTopic = URLEncoder.encode(topic, "utf-8") + context.child(encTopic) match { + case Some(t) ⇒ t forward msg + case None ⇒ + val t = context.actorOf(Props(new Topic(removedTimeToLive)), name = encTopic) + t forward msg + put(mkKey(t), Some(t)) + context.watch(t) + } + + case msg @ Unsubscribe(topic, _) ⇒ + context.child(URLEncoder.encode(topic, "utf-8")) match { + case Some(g) ⇒ g ! msg + case None ⇒ // no such topic here + } + + case Status(otherVersions) ⇒ + // gossip chat starts with a Status message, containing the bucket versions of the other node + val delta = collectDelta(otherVersions) + if (delta.nonEmpty) + sender ! Delta(delta) + if (otherHasNewerVersions(otherVersions)) + sender ! Status(versions = myVersions) // it will reply with Delta + + case Delta(buckets) ⇒ + // reply from Status message in the gossip chat + // the Delta contains potential updates (newer versions) from the other node + if (nodes.contains(sender.path.address)) { + buckets foreach { b ⇒ + val myBucket = registry(b.owner) + if (b.version > myBucket.version) { + registry += (b.owner -> myBucket.copy(version = b.version, content = myBucket.content ++ b.content)) + } + } + } + + case GossipTick ⇒ gossip() + + case Prune ⇒ prune() + + case Terminated(a) ⇒ + val key = mkKey(a) + registry(selfAddress).content.get(key) match { + case Some(ValueHolder(_, Some(`a`))) ⇒ + // remove + put(key, None) + case _ ⇒ + } + + case state: CurrentClusterState ⇒ + nodes = state.members.collect { + case m if m.status != MemberStatus.Joining && matchingRole(m) ⇒ m.address + } + + case MemberUp(m) ⇒ + if (matchingRole(m)) + nodes += m.address + + case MemberRemoved(m) ⇒ + if (m.address == selfAddress) + context stop self + else if (matchingRole(m)) { + nodes -= m.address + registry -= m.address + } + + case _: MemberEvent ⇒ // not of interest + + case Count ⇒ + val count = registry.map { + case (owner, bucket) ⇒ bucket.content.count { + case (_, valueHolder) ⇒ valueHolder.ref.isDefined + } + }.sum + sender ! count + } + + def publish(path: String, msg: Any): Unit = { + for { + (_, bucket) ← registry + valueHolder ← bucket.content.get(path) + ref ← valueHolder.ref + } ref forward msg + } + + def put(key: String, valueOption: Option[ActorRef]): Unit = { + val bucket = registry(selfAddress) + val v = nextVersion() + registry += (selfAddress -> bucket.copy(version = v, + content = bucket.content + (key -> ValueHolder(v, valueOption)))) + } + + def mkKey(ref: ActorRef): String = mkKey(ref.path) + + def mkKey(path: ActorPath): String = path.elements.mkString("/", "/", "") + + def myVersions: Map[Address, Long] = registry.map { case (owner, bucket) ⇒ (owner -> bucket.version) } + + def collectDelta(otherVersions: Map[Address, Long]): immutable.Iterable[Bucket] = { + // missing entries are represented by version 0 + val filledOtherVersions = myVersions.map { case (k, _) ⇒ k -> 0L } ++ otherVersions + filledOtherVersions.collect { + case (owner, v) if registry(owner).version > v ⇒ + val bucket = registry(owner) + val deltaContent = bucket.content.filter { + case (_, value) ⇒ value.version > v + } + bucket.copy(content = deltaContent) + } + } + + def otherHasNewerVersions(otherVersions: Map[Address, Long]): Boolean = + otherVersions.exists { + case (owner, v) ⇒ v > registry(owner).version + } + + /** + * Gossip to peer nodes. + */ + def gossip(): Unit = selectRandomNode((nodes - selfAddress).toVector) foreach gossipTo + + def gossipTo(address: Address): Unit = { + context.actorSelection(self.path.toStringWithAddress(address)) ! Status(versions = myVersions) + } + + def selectRandomNode(addresses: immutable.IndexedSeq[Address]): Option[Address] = + if (addresses.isEmpty) None else Some(addresses(ThreadLocalRandom.current nextInt addresses.size)) + + def prune(): Unit = { + registry foreach { + case (owner, bucket) ⇒ + val oldRemoved = bucket.content.collect { + case (key, ValueHolder(version, None)) if (bucket.version - version > removedTimeToLiveMillis) ⇒ key + } + if (oldRemoved.nonEmpty) + registry += owner -> bucket.copy(content = bucket.content -- oldRemoved) + } + } +} + +/** + * Extension that starts a [[DistributedPubSubMediator]] actor + * with settings defined in config section `akka.contrib.cluster.pub-sub`. + */ +object DistributedPubSubExtension extends ExtensionId[DistributedPubSubExtension] with ExtensionIdProvider { + override def get(system: ActorSystem): DistributedPubSubExtension = super.get(system) + + override def lookup = DistributedPubSubExtension + + override def createExtension(system: ExtendedActorSystem): DistributedPubSubExtension = + new DistributedPubSubExtension(system) +} + +class DistributedPubSubExtension(system: ExtendedActorSystem) extends Extension { + + private val config = system.settings.config.getConfig("akka.contrib.cluster.pub-sub") + private val role: Option[String] = config.getString("role") match { + case "" ⇒ None + case r ⇒ Some(r) + } + + /** + * Returns true if this member is not tagged with the role configured for the + * mediator. + */ + def isTerminated: Boolean = Cluster(system).isTerminated || !role.forall(Cluster(system).selfRoles.contains) + + /** + * The [[DistributedPubSubMediator]] + */ + val mediator: ActorRef = { + if (isTerminated) + system.deadLetters + else { + val gossipInterval = Duration(config.getMilliseconds("gossip-interval"), MILLISECONDS) + val removedTimeToLive = Duration(config.getMilliseconds("removed-time-to-live"), MILLISECONDS) + val name = config.getString("name") + system.actorOf(Props(new DistributedPubSubMediator(role, gossipInterval, removedTimeToLive)), + name) + } + } +} diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/DistributedPubSubMediatorSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/DistributedPubSubMediatorSpec.scala new file mode 100644 index 0000000000..a3dc568d8a --- /dev/null +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/DistributedPubSubMediatorSpec.scala @@ -0,0 +1,322 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package akka.contrib.pattern + +import language.postfixOps +import scala.concurrent.duration._ +import com.typesafe.config.ConfigFactory +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.PoisonPill +import akka.actor.Props +import akka.cluster.Cluster +import akka.cluster.ClusterEvent._ +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.testkit.STMultiNodeSpec +import akka.testkit._ +import akka.actor.ActorLogging + +object DistributedPubSubMediatorSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig(ConfigFactory.parseString(""" + akka.loglevel = INFO + akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.remote.log-remote-lifecycle-events = off + akka.cluster.auto-join = off + akka.cluster.auto-down = on + """)) + + object TestChatUser { + case class Whisper(path: String, msg: Any) + case class Talk(path: String, msg: Any) + case class Shout(topic: String, msg: Any) + } + + class TestChatUser(mediator: ActorRef, testActor: ActorRef) extends Actor { + import TestChatUser._ + import DistributedPubSubMediator._ + + def receive = { + case Whisper(path, msg) ⇒ mediator ! Send(path, msg, localAffinity = true) + case Talk(path, msg) ⇒ mediator ! SendToAll(path, msg) + case Shout(topic, msg) ⇒ mediator ! Publish(topic, msg) + case msg ⇒ testActor ! msg + } + } + + //#publisher + class Publisher extends Actor { + import DistributedPubSubMediator.Publish + // activate the extension + val mediator = DistributedPubSubExtension(context.system).mediator + + def receive = { + case in: String ⇒ + val out = in.toUpperCase + mediator ! Publish("content", out) + } + } + //#publisher + + //#subscriber + class Subscriber extends Actor with ActorLogging { + import DistributedPubSubMediator.{ Subscribe, SubscribeAck } + val mediator = DistributedPubSubExtension(context.system).mediator + // subscribe to the topic named "content" + mediator ! Subscribe("content", self) + + def receive = { + case SubscribeAck(Subscribe("content", `self`)) ⇒ + context become ready + } + + def ready: Actor.Receive = { + case s: String ⇒ + log.info("Got {}", s) + } + } + //#subscriber + +} + +class DistributedPubSubMediatorMultiJvmNode1 extends DistributedPubSubMediatorSpec +class DistributedPubSubMediatorMultiJvmNode2 extends DistributedPubSubMediatorSpec +class DistributedPubSubMediatorMultiJvmNode3 extends DistributedPubSubMediatorSpec + +class DistributedPubSubMediatorSpec extends MultiNodeSpec(DistributedPubSubMediatorSpec) with STMultiNodeSpec with ImplicitSender { + import DistributedPubSubMediatorSpec._ + import DistributedPubSubMediatorSpec.TestChatUser._ + import DistributedPubSubMediator._ + + override def initialParticipants = roles.size + + def join(from: RoleName, to: RoleName): Unit = { + runOn(from) { + Cluster(system) join node(to).address + createMediator() + } + enterBarrier(from.name + "-joined") + } + + def createMediator(): ActorRef = DistributedPubSubExtension(system).mediator + def mediator: ActorRef = DistributedPubSubExtension(system).mediator + + var chatUsers: Map[String, ActorRef] = Map.empty + + def createChatUser(name: String): ActorRef = { + var a = system.actorOf(Props(new TestChatUser(mediator, testActor)), name) + chatUsers += (name -> a) + a + } + + def chatUser(name: String): ActorRef = chatUsers(name) + + def awaitCount(expected: Int): Unit = { + awaitAssert { + mediator ! Count + expectMsgType[Int] must be(expected) + } + } + + "A DistributedPubSubMediator" must { + + "startup 2 node cluster" in within(15 seconds) { + join(first, first) + join(second, first) + enterBarrier("after-1") + } + + "keep track of added users" in within(15 seconds) { + runOn(first) { + val u1 = createChatUser("u1") + mediator ! Put(u1) + + val u2 = createChatUser("u2") + mediator ! Put(u2) + + awaitCount(2) + + // send to actor at same node + u1 ! Whisper("/user/u2", "hello") + expectMsg("hello") + lastSender must be(u2) + } + + runOn(second) { + val u3 = createChatUser("u3") + mediator ! Put(u3) + } + + runOn(first, second) { + awaitCount(3) + } + enterBarrier("3-registered") + + runOn(second) { + val u4 = createChatUser("u4") + mediator ! Put(u4) + } + + runOn(first, second) { + awaitCount(4) + } + enterBarrier("4-registered") + + runOn(first) { + // send to actor on another node + chatUser("u1") ! Whisper("/user/u4", "hi there") + } + + runOn(second) { + expectMsg("hi there") + lastSender.path.name must be("u4") + } + + enterBarrier("after-2") + } + + "replicate users to new node" in within(20 seconds) { + join(third, first) + + runOn(third) { + val u5 = createChatUser("u5") + mediator ! Put(u5) + } + + awaitCount(5) + enterBarrier("5-registered") + + runOn(third) { + chatUser("u5") ! Whisper("/user/u4", "go") + } + + runOn(second) { + expectMsg("go") + lastSender.path.name must be("u4") + } + + enterBarrier("after-3") + } + + "keep track of removed users" in within(15 seconds) { + runOn(first) { + val u6 = createChatUser("u6") + mediator ! Put(u6) + } + awaitCount(6) + enterBarrier("6-registered") + + runOn(first) { + mediator ! Remove("/user/u6") + } + awaitCount(5) + + enterBarrier("after-4") + } + + "remove terminated users" in within(5 seconds) { + runOn(second) { + chatUser("u3") ! PoisonPill + } + + awaitCount(4) + enterBarrier("after-5") + } + + "publish" in within(15 seconds) { + runOn(first, second) { + val u7 = createChatUser("u7") + mediator ! Put(u7) + } + awaitCount(6) + enterBarrier("7-registered") + + runOn(third) { + chatUser("u5") ! Talk("/user/u7", "hi") + } + + runOn(first, second) { + expectMsg("hi") + lastSender.path.name must be("u7") + } + runOn(third) { + expectNoMsg(2.seconds) + } + + enterBarrier("after-6") + } + + "publish to topic" in within(15 seconds) { + runOn(first) { + val s8 = Subscribe("topic1", createChatUser("u8")) + mediator ! s8 + expectMsg(SubscribeAck(s8)) + val s9 = Subscribe("topic1", createChatUser("u9")) + mediator ! s9 + expectMsg(SubscribeAck(s9)) + } + runOn(second) { + val s10 = Subscribe("topic1", createChatUser("u10")) + mediator ! s10 + expectMsg(SubscribeAck(s10)) + } + // one topic on two nodes + awaitCount(8) + enterBarrier("topic1-registered") + + runOn(third) { + chatUser("u5") ! Shout("topic1", "hello all") + } + + runOn(first) { + val names = receiveWhile(messages = 2) { + case "hello all" ⇒ lastSender.path.name + } + names.toSet must be(Set("u8", "u9")) + } + runOn(second) { + expectMsg("hello all") + lastSender.path.name must be("u10") + } + runOn(third) { + expectNoMsg(2.seconds) + } + + enterBarrier("after-7") + } + + "demonstrate usage" in within(15 seconds) { + def later(): Unit = { + awaitCount(10) + } + + //#start-subscribers + runOn(first) { + system.actorOf(Props[Subscriber], "subscriber1") + } + runOn(second) { + system.actorOf(Props[Subscriber], "subscriber2") + system.actorOf(Props[Subscriber], "subscriber3") + } + //#start-subscribers + + //#publish-message + runOn(third) { + val publisher = system.actorOf(Props[Publisher], "publisher") + later() + // after a while the subscriptions are replicated + publisher ! "hello" + } + //#publish-message + + enterBarrier("after-8") + } + + } +} diff --git a/akka-contrib/src/test/java/akka/contrib/pattern/DistributedPubSubMediatorTest.java b/akka-contrib/src/test/java/akka/contrib/pattern/DistributedPubSubMediatorTest.java new file mode 100644 index 0000000000..2b6f0a0943 --- /dev/null +++ b/akka-contrib/src/test/java/akka/contrib/pattern/DistributedPubSubMediatorTest.java @@ -0,0 +1,92 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.contrib.pattern; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.actor.UntypedActor; +import akka.event.Logging; +import akka.event.LoggingAdapter; + +public class DistributedPubSubMediatorTest { + + private static ActorSystem system; + + @BeforeClass + public static void setup() { + system = ActorSystem.create(); + } + + @AfterClass + public static void teardown() { + system.shutdown(); + } + + @Test + public void demonstrateUsage() { + //#start-subscribers + system.actorOf(new Props(Subscriber.class), "subscriber1"); + //another node + system.actorOf(new Props(Subscriber.class), "subscriber2"); + system.actorOf(new Props(Subscriber.class), "subscriber3"); + //#start-subscribers + + //#publish-message + //somewhere else + ActorRef publisher = system.actorOf(new Props(Publisher.class), "publisher"); + // after a while the subscriptions are replicated + publisher.tell("hello", null); + //#publish-message + } + + static//#subscriber + public class Subscriber extends UntypedActor { + LoggingAdapter log = Logging.getLogger(getContext().system(), this); + + public Subscriber() { + ActorRef mediator = + DistributedPubSubExtension.get(getContext().system()).mediator(); + // subscribe to the topic named "content" + mediator.tell(new DistributedPubSubMediator.Subscribe("content", getSelf()), + getSelf()); + } + + public void onReceive(Object msg) { + if (msg instanceof String) + log.info("Got: {}", msg); + else if (msg instanceof DistributedPubSubMediator.SubscribeAck) + log.info("subscribing"); + else + unhandled(msg); + } + } + + //#subscriber + + static//#publisher + public class Publisher extends UntypedActor { + + // activate the extension + ActorRef mediator = + DistributedPubSubExtension.get(getContext().system()).mediator(); + + public void onReceive(Object msg) { + if (msg instanceof String) { + String in = (String) msg; + String out = in.toUpperCase(); + mediator.tell(new DistributedPubSubMediator.Publish("content", out), + getSelf()); + } else { + unhandled(msg); + } + } + } + //#publisher +} diff --git a/akka-docs/rst/cluster/cluster-usage-java.rst b/akka-docs/rst/cluster/cluster-usage-java.rst index 09a4f6d3c8..c8e3de3789 100644 --- a/akka-docs/rst/cluster/cluster-usage-java.rst +++ b/akka-docs/rst/cluster/cluster-usage-java.rst @@ -269,6 +269,11 @@ events, but there are several corner cases to consider. Therefore, this specific case is made easily accessible by the :ref:`cluster-singleton` in the contrib module. You can use it as is, or adjust to fit your specific needs. +Distributed Publish Subscribe Pattern +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +See :ref:`distributed-pub-sub` in the contrib module. + Failure Detector ^^^^^^^^^^^^^^^^ diff --git a/akka-docs/rst/cluster/cluster-usage-scala.rst b/akka-docs/rst/cluster/cluster-usage-scala.rst index 804234a892..a54f9a7472 100644 --- a/akka-docs/rst/cluster/cluster-usage-scala.rst +++ b/akka-docs/rst/cluster/cluster-usage-scala.rst @@ -257,6 +257,11 @@ events, but there are several corner cases to consider. Therefore, this specific case is made easily accessible by the :ref:`cluster-singleton` in the contrib module. You can use it as is, or adjust to fit your specific needs. +Distributed Publish Subscribe Pattern +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +See :ref:`distributed-pub-sub` in the contrib module. + Failure Detector ^^^^^^^^^^^^^^^^