Distributed pub-sub in cluster, see #3203

This commit is contained in:
Patrik Nordwall 2013-04-06 16:22:30 +02:00
parent d5a658f433
commit 1d1a6383df
10 changed files with 1039 additions and 4 deletions

View file

@ -88,6 +88,10 @@ the group of members tagged with a specific role.
Note that the hand-over might still be in progress and the singleton actor might not be started yet
when you receive the ``LeaderChanged`` / ``RoleLeaderChanged`` event.
A nice alternative to the above proxy is to use :ref:`distributed-pub-sub`. Let the singleton
actor register itself to the mediator with ``DistributedPubSubMediator.Put`` message when it is
started. Send messages to the singleton actor via the mediator with ``DistributedPubSubMediator.SendToAll``.
To test scenarios where the cluster leader node is removed or shut down you can use :ref:`multi-node-testing` and
utilize the fact that the leader is supposed to be the first member when sorted by member address.

View file

@ -0,0 +1,129 @@
.. _distributed-pub-sub:
Distributed Publish Subscribe in Cluster
========================================
How do I send a message to an actor without knowing which node it is running on?
How do I send messages to all actors in the cluster that have registered interest
in a named topic?
This pattern provides a mediator actor, ``akka.contrib.pattern.DistributedPubSubMediator``,
that manages a registry of actor references and replicates the entries to peer
actors among all cluster nodes or a group of nodes tagged with a specific role.
The `DistributedPubSubMediator` is supposed to be started on all nodes,
or all nodes with specified role, in the cluster. The mediator can be
started with the ``DistributedPubSubExtension`` or as an ordinary actor.
Changes are only performed in the own part of the registry and those changes
are versioned. Deltas are disseminated in a scalable way to other nodes with
a gossip protocol. The registry is eventually consistent, i.e. changes are not
immediately visible at other nodes, but typically they will be fully replicated
to all other nodes after a few seconds.
You can send messages via the mediator on any node to registered actors on
any other node. There is three modes of message delivery.
**1. DistributedPubSubMediator.Send**
The message will be delivered to one recipient with a matching path, if any such
exists in the registry. If several entries match the path the message will be delivered
to one random destination. The sender of the message can specify that local
affinity is preferred, i.e. the message is sent to an actor in the same local actor
system as the used mediator actor, if any such exists, otherwise random to any other
matching entry. A typical usage of this mode is private chat to one other user in
an instant messaging application. It can also be used for distributing tasks to workers,
like a random router.
**2. DistributedPubSubMediator.SendToAll**
The message will be delivered to all recipients with a matching path. Actors with
the same path, without address information, can be registered on different nodes.
On each node there can only be one such actor, since the path is unique within one
local actor system. Typical usage of this mode is to broadcast messages to all replicas
with the same path, e.g. 3 actors on different nodes that all perform the same actions,
for redundancy.
**3. DistributedPubSubMediator.Publish**
Actors may be registered to a named topic instead of path. This enables many subscribers
on each node. The message will be delivered to all subscribers of the topic. For
efficiency the message is sent over the wire only once per node (that has a matching topic),
and then delivered to all subscribers of the local topic representation. This is the
true pub/sub mode. A typical usage of this mode is a chat room in an instant messaging
application.
You register actors to the local mediator with ``DistributedPubSubMediator.Put`` or
``DistributedPubSubMediator.Subscribe``. ``Put`` is used together with ``Send`` and
``SendToAll`` message delivery modes. The ``ActorRef`` in ``Put`` must belong to the same
local actor system as the mediator. ``Subscribe`` is used together with ``Publish``.
Actors are automatically removed from the registry when they are terminated, or you
can explicitly remove entries with ``DistributedPubSubMediator.Remove`` or
``DistributedPubSubMediator.Unsubscribe``.
Successful ``Subscribe`` and ``Unsubscribe`` is acknowledged with
``DistributedPubSubMediator.SubscribeAck`` and ``DistributedPubSubMediator.UnsubscribeAck``
replies.
A Small Example in Java
-----------------------
A subscriber actor:
.. includecode:: @contribSrc@/src/test/java/akka/contrib/pattern/DistributedPubSubMediatorTest.java#subscriber
Subscriber actors can be started on several nodes in the cluster, and all will receive
messages published to the "content" topic.
.. includecode:: @contribSrc@/src/test/java/akka/contrib/pattern/DistributedPubSubMediatorTest.java#start-subscribers
A simple actor that publishes to this "content" topic:
.. includecode:: @contribSrc@/src/test/java/akka/contrib/pattern/DistributedPubSubMediatorTest.java#publisher
It can publish messages to the topic from anywhere in the cluster:
.. includecode:: @contribSrc@/src/test/java/akka/contrib/pattern/DistributedPubSubMediatorTest.java#publish-message
A Small Example in Scala
------------------------
A subscriber actor:
.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/DistributedPubSubMediatorSpec.scala#subscriber
Subscriber actors can be started on several nodes in the cluster, and all will receive
messages published to the "content" topic.
.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/DistributedPubSubMediatorSpec.scala#start-subscribers
A simple actor that publishes to this "content" topic:
.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/DistributedPubSubMediatorSpec.scala#publisher
It can publish messages to the topic from anywhere in the cluster:
.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/DistributedPubSubMediatorSpec.scala#publish-message
DistributedPubSubExtension
--------------------------
In the example above the mediator is started and accessed with the ``akka.contrib.pattern.DistributedPubSubExtension``.
That is convenient and perfectly fine in most cases, but it can be good to know that it is possible to
start the mediator actor as an ordinary actor and you can have several different mediators at the same
time to be able to divide a large number of actors/topics to different mediators. For example you might
want to use different cluster roles for different mediators.
The ``DistributedPubSubExtension`` can be configured with the following properties:
.. includecode:: @contribSrc@/src/main/resources/reference.conf#pub-sub-ext-config
It is recommended to load the extension when the actor system is started by defining it in
``akka.extensions`` configuration property. Otherwise it will be activated when first used
and then it takes a while for it to be populated.
::
akka.extensions = ["akka.contrib.pattern.DistributedPubSubExtension"]

View file

@ -35,6 +35,7 @@ The Current List of Modules
jul
peek-mailbox
cluster-singleton
distributed-pub-sub
Suggested Way of Using these Contributions
------------------------------------------

View file

@ -0,0 +1,24 @@
######################################
# Akka Contrib Reference Config File #
######################################
# This is the reference config file that contains all the default settings.
# Make your edits/overrides in your application.conf.
# //#pub-sub-ext-config
# Settings for the DistributedPubSubExtension
akka.contrib.cluster.pub-sub {
# Actor name of the mediator actor, /user/distributedPubSubMediator
name = distributedPubSubMediator
# Start the mediator on members tagged with this role.
# All members are used if undefined or empty.
role = ""
# How often the DistributedPubSubMediator should send out gossip information
gossip-interval = 1s
# Removed entries are pruned after this duration
removed-time-to-live = 120s
}
# //#pub-sub-ext-config

View file

@ -337,10 +337,8 @@ class ClusterSingletonManager(
val cluster = Cluster(context.system)
val selfAddressOption = Some(cluster.selfAddress)
role match {
case None
case Some(r) require(cluster.selfRoles.contains(r), s"This cluster member [${cluster.selfAddress}] doesn't have the role [$role]")
}
require(role.forall(cluster.selfRoles.contains),
s"This cluster member [${cluster.selfAddress}] doesn't have the role [$role]")
// started when when self member is Up
var leaderChangedBuffer: ActorRef = _

View file

@ -0,0 +1,455 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.contrib.pattern
import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom
import java.net.URLEncoder
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorPath
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Address
import akka.actor.ExtendedActorSystem
import akka.actor.Extension
import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider
import akka.actor.Props
import akka.actor.Terminated
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
import akka.cluster.Member
import akka.cluster.MemberStatus
object DistributedPubSubMediator {
@SerialVersionUID(1L)
case class Put(ref: ActorRef)
@SerialVersionUID(1L)
case class Remove(path: String)
@SerialVersionUID(1L)
case class Subscribe(topic: String, ref: ActorRef)
@SerialVersionUID(1L)
case class Unsubscribe(topic: String, ref: ActorRef)
@SerialVersionUID(1L)
case class SubscribeAck(subscribe: Subscribe)
@SerialVersionUID(1L)
case class UnsubscribeAck(unsubscribe: Unsubscribe)
@SerialVersionUID(1L)
case class Send(path: String, msg: Any, localAffinity: Boolean)
@SerialVersionUID(1L)
case class SendToAll(path: String, msg: Any)
@SerialVersionUID(1L)
case class Publish(topic: String, msg: Any)
// Only for testing purposes, to poll/await replication
case object Count
/**
* INTERNAL API
*/
private[pattern] object Internal {
case object Prune
@SerialVersionUID(1L)
case class Bucket(
owner: Address,
version: Long,
content: Map[String, ValueHolder])
@SerialVersionUID(1L)
case class ValueHolder(version: Long, ref: Option[ActorRef])
@SerialVersionUID(1L)
case class Status(versions: Map[Address, Long])
@SerialVersionUID(1L)
case class Delta(buckets: immutable.Iterable[Bucket])
case object GossipTick
def roleOption(role: String): Option[String] = role match {
case null | "" None
case _ Some(role)
}
class Topic(emptyTimeToLive: FiniteDuration) extends Actor {
import context.dispatcher
val pruneInterval: FiniteDuration = emptyTimeToLive / 2
val pruneTask = context.system.scheduler.schedule(pruneInterval, pruneInterval, self, Prune)
var pruneDeadline: Option[Deadline] = None
var subscribers = Set.empty[ActorRef]
override def postStop(): Unit = {
super.postStop()
pruneTask.cancel()
}
def receive = {
case msg @ Subscribe(_, ref)
context watch ref
subscribers += ref
pruneDeadline = None
sender.tell(SubscribeAck(msg), context.parent)
case msg @ Unsubscribe(_, ref)
context unwatch ref
remove(ref)
sender.tell(UnsubscribeAck(msg), context.parent)
case Terminated(ref)
remove(ref)
case Prune
for (d pruneDeadline if d.isOverdue) context stop self
case msg
subscribers foreach { _ forward msg }
}
def remove(ref: ActorRef): Unit =
if (subscribers.contains(ref)) {
subscribers -= ref
if (subscribers.isEmpty)
pruneDeadline = Some(Deadline.now + emptyTimeToLive)
}
}
}
}
/**
* This actor manages a registry of actor references and replicates
* the entries to peer actors among all cluster nodes or a group of nodes
* tagged with a specific role.
*
* The `DistributedPubSubMediator` is supposed to be started on all nodes,
* or all nodes with specified role, in the cluster. The mediator can be
* started with the [[DistributedPubSubExtension]] or as an ordinary actor.
*
* Changes are only performed in the own part of the registry and those changes
* are versioned. Deltas are disseminated in a scalable way to other nodes with
* a gossip protocol. The registry is eventually consistent, i.e. changes are not
* immediately visible at other nodes, but typically they will be fully replicated
* to all other nodes after a few seconds.
*
* You can send messages via the mediator on any node to registered actors on
* any other node. There is three modes of message delivery.
*
* 1. [[DistributedPubSubMediator.Send]] -
* The message will be delivered to one recipient with a matching path, if any such
* exists in the registry. If several entries match the path the message will be delivered
* to one random destination. The sender of the message can specify that local
* affinity is preferred, i.e. the message is sent to an actor in the same local actor
* system as the used mediator actor, if any such exists, otherwise random to any other
* matching entry. A typical usage of this mode is private chat to one other user in
* an instant messaging application. It can also be used for distributing tasks to workers,
* like a random router.
*
* 2. [[DistributedPubSubMediator.SendToAll]] -
* The message will be delivered to all recipients with a matching path. Actors with
* the same path, without address information, can be registered on different nodes.
* On each node there can only be one such actor, since the path is unique within one
* local actor system. Typical usage of this mode is to broadcast messages to all replicas
* with the same path, e.g. 3 actors on different nodes that all perform the same actions,
* for redundancy.
*
* 3. [[DistributedPubSubMediator.Publish]] -
* Actors may be registered to a named topic instead of path. This enables many subscribers
* on each node. The message will be delivered to all subscribers of the topic. For
* efficiency the message is sent over the wire only once per node (that has a matching topic),
* and then delivered to all subscribers of the local topic representation. This is the
* true pub/sub mode. A typical usage of this mode is a chat room in an instant messaging
* application.
*
* You register actors to the local mediator with [[DistributedPubSubMediator.Put]] or
* [[DistributedPubSubMediator.Subscribe]]. `Put` is used together with `Send` and
* `SendToAll` message delivery modes. The `ActorRef` in `Put` must belong to the same
* local actor system as the mediator. `Subscribe` is used together with `Publish`.
* Actors are automatically removed from the registry when they are terminated, or you
* can explicitly remove entries with [[DistributedPubSubMediator.Remove]] or
* [[DistributedPubSubMediator.Unsubscribe]].
*
* Successful `Subscribe` and `Unsubscribe` is acknowledged with
* [[DistributedPubSubMediator.SubscribeAck]] and [[DistributedPubSubMediator.UnsubscribeAck]]
* replies.
*/
class DistributedPubSubMediator(
role: Option[String],
gossipInterval: FiniteDuration = 1.second,
removedTimeToLive: FiniteDuration = 2.minutes)
extends Actor with ActorLogging {
/**
* Java API constructor with default values.
*/
def this(role: String) = this(DistributedPubSubMediator.Internal.roleOption(role))
import DistributedPubSubMediator._
import DistributedPubSubMediator.Internal._
val cluster = Cluster(context.system)
import cluster.selfAddress
require(role.forall(cluster.selfRoles.contains),
s"This cluster member [${selfAddress}] doesn't have the role [$role]")
val removedTimeToLiveMillis = removedTimeToLive.toMillis
//Start periodic gossip to random nodes in cluster
import context.dispatcher
val gossipTask = context.system.scheduler.schedule(gossipInterval, gossipInterval, self, GossipTick)
val pruneInterval: FiniteDuration = removedTimeToLive / 2
val pruneTask = context.system.scheduler.schedule(pruneInterval, pruneInterval, self, Prune)
var registry: Map[Address, Bucket] = Map.empty.withDefault(a Bucket(a, 0L, Map.empty))
var nodes: Set[Address] = Set.empty
// the version is a timestamp because it is also used when pruning removed entries
val nextVersion = {
var version = 0L
() {
val current = System.currentTimeMillis
version = if (current > version) current else version + 1
version
}
}
override def preStart(): Unit = {
super.preStart()
require(!cluster.isTerminated, "Cluster node must not be terminated")
cluster.subscribe(self, classOf[MemberEvent])
}
override def postStop(): Unit = {
super.postStop()
cluster unsubscribe self
gossipTask.cancel()
pruneTask.cancel()
}
def matchingRole(m: Member): Boolean = role.forall(m.hasRole)
def receive = {
case Send(path, msg, localAffinity)
registry(selfAddress).content.get(path) match {
case Some(ValueHolder(_, Some(ref))) if localAffinity
ref forward msg
case _
val refs = (for {
(_, bucket) registry
valueHolder bucket.content.get(path)
ref valueHolder.ref
} yield ref).toVector
if (refs.nonEmpty)
refs(ThreadLocalRandom.current.nextInt(refs.size)) forward msg
}
case SendToAll(path, msg)
publish(path, msg)
case Publish(topic, msg)
publish(mkKey(self.path / URLEncoder.encode(topic, "utf-8")), msg)
case Put(ref: ActorRef)
if (ref.path.address.hasGlobalScope)
log.warning("Registered actor must be local: [{}]", ref)
else {
put(mkKey(ref), Some(ref))
context.watch(ref)
}
case Remove(key)
registry(selfAddress).content.get(key) match {
case Some(ValueHolder(_, Some(ref)))
context.unwatch(ref)
put(key, None)
case _
}
case msg @ Subscribe(topic, _)
// each topic is managed by a child actor with the same name as the topic
val encTopic = URLEncoder.encode(topic, "utf-8")
context.child(encTopic) match {
case Some(t) t forward msg
case None
val t = context.actorOf(Props(new Topic(removedTimeToLive)), name = encTopic)
t forward msg
put(mkKey(t), Some(t))
context.watch(t)
}
case msg @ Unsubscribe(topic, _)
context.child(URLEncoder.encode(topic, "utf-8")) match {
case Some(g) g ! msg
case None // no such topic here
}
case Status(otherVersions)
// gossip chat starts with a Status message, containing the bucket versions of the other node
val delta = collectDelta(otherVersions)
if (delta.nonEmpty)
sender ! Delta(delta)
if (otherHasNewerVersions(otherVersions))
sender ! Status(versions = myVersions) // it will reply with Delta
case Delta(buckets)
// reply from Status message in the gossip chat
// the Delta contains potential updates (newer versions) from the other node
if (nodes.contains(sender.path.address)) {
buckets foreach { b
val myBucket = registry(b.owner)
if (b.version > myBucket.version) {
registry += (b.owner -> myBucket.copy(version = b.version, content = myBucket.content ++ b.content))
}
}
}
case GossipTick gossip()
case Prune prune()
case Terminated(a)
val key = mkKey(a)
registry(selfAddress).content.get(key) match {
case Some(ValueHolder(_, Some(`a`)))
// remove
put(key, None)
case _
}
case state: CurrentClusterState
nodes = state.members.collect {
case m if m.status != MemberStatus.Joining && matchingRole(m) m.address
}
case MemberUp(m)
if (matchingRole(m))
nodes += m.address
case MemberRemoved(m)
if (m.address == selfAddress)
context stop self
else if (matchingRole(m)) {
nodes -= m.address
registry -= m.address
}
case _: MemberEvent // not of interest
case Count
val count = registry.map {
case (owner, bucket) bucket.content.count {
case (_, valueHolder) valueHolder.ref.isDefined
}
}.sum
sender ! count
}
def publish(path: String, msg: Any): Unit = {
for {
(_, bucket) registry
valueHolder bucket.content.get(path)
ref valueHolder.ref
} ref forward msg
}
def put(key: String, valueOption: Option[ActorRef]): Unit = {
val bucket = registry(selfAddress)
val v = nextVersion()
registry += (selfAddress -> bucket.copy(version = v,
content = bucket.content + (key -> ValueHolder(v, valueOption))))
}
def mkKey(ref: ActorRef): String = mkKey(ref.path)
def mkKey(path: ActorPath): String = path.elements.mkString("/", "/", "")
def myVersions: Map[Address, Long] = registry.map { case (owner, bucket) (owner -> bucket.version) }
def collectDelta(otherVersions: Map[Address, Long]): immutable.Iterable[Bucket] = {
// missing entries are represented by version 0
val filledOtherVersions = myVersions.map { case (k, _) k -> 0L } ++ otherVersions
filledOtherVersions.collect {
case (owner, v) if registry(owner).version > v
val bucket = registry(owner)
val deltaContent = bucket.content.filter {
case (_, value) value.version > v
}
bucket.copy(content = deltaContent)
}
}
def otherHasNewerVersions(otherVersions: Map[Address, Long]): Boolean =
otherVersions.exists {
case (owner, v) v > registry(owner).version
}
/**
* Gossip to peer nodes.
*/
def gossip(): Unit = selectRandomNode((nodes - selfAddress).toVector) foreach gossipTo
def gossipTo(address: Address): Unit = {
context.actorSelection(self.path.toStringWithAddress(address)) ! Status(versions = myVersions)
}
def selectRandomNode(addresses: immutable.IndexedSeq[Address]): Option[Address] =
if (addresses.isEmpty) None else Some(addresses(ThreadLocalRandom.current nextInt addresses.size))
def prune(): Unit = {
registry foreach {
case (owner, bucket)
val oldRemoved = bucket.content.collect {
case (key, ValueHolder(version, None)) if (bucket.version - version > removedTimeToLiveMillis) key
}
if (oldRemoved.nonEmpty)
registry += owner -> bucket.copy(content = bucket.content -- oldRemoved)
}
}
}
/**
* Extension that starts a [[DistributedPubSubMediator]] actor
* with settings defined in config section `akka.contrib.cluster.pub-sub`.
*/
object DistributedPubSubExtension extends ExtensionId[DistributedPubSubExtension] with ExtensionIdProvider {
override def get(system: ActorSystem): DistributedPubSubExtension = super.get(system)
override def lookup = DistributedPubSubExtension
override def createExtension(system: ExtendedActorSystem): DistributedPubSubExtension =
new DistributedPubSubExtension(system)
}
class DistributedPubSubExtension(system: ExtendedActorSystem) extends Extension {
private val config = system.settings.config.getConfig("akka.contrib.cluster.pub-sub")
private val role: Option[String] = config.getString("role") match {
case "" None
case r Some(r)
}
/**
* Returns true if this member is not tagged with the role configured for the
* mediator.
*/
def isTerminated: Boolean = Cluster(system).isTerminated || !role.forall(Cluster(system).selfRoles.contains)
/**
* The [[DistributedPubSubMediator]]
*/
val mediator: ActorRef = {
if (isTerminated)
system.deadLetters
else {
val gossipInterval = Duration(config.getMilliseconds("gossip-interval"), MILLISECONDS)
val removedTimeToLive = Duration(config.getMilliseconds("removed-time-to-live"), MILLISECONDS)
val name = config.getString("name")
system.actorOf(Props(new DistributedPubSubMediator(role, gossipInterval, removedTimeToLive)),
name)
}
}
}

View file

@ -0,0 +1,322 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.contrib.pattern
import language.postfixOps
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.PoisonPill
import akka.actor.Props
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.testkit._
import akka.actor.ActorLogging
object DistributedPubSubMediatorSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-join = off
akka.cluster.auto-down = on
"""))
object TestChatUser {
case class Whisper(path: String, msg: Any)
case class Talk(path: String, msg: Any)
case class Shout(topic: String, msg: Any)
}
class TestChatUser(mediator: ActorRef, testActor: ActorRef) extends Actor {
import TestChatUser._
import DistributedPubSubMediator._
def receive = {
case Whisper(path, msg) mediator ! Send(path, msg, localAffinity = true)
case Talk(path, msg) mediator ! SendToAll(path, msg)
case Shout(topic, msg) mediator ! Publish(topic, msg)
case msg testActor ! msg
}
}
//#publisher
class Publisher extends Actor {
import DistributedPubSubMediator.Publish
// activate the extension
val mediator = DistributedPubSubExtension(context.system).mediator
def receive = {
case in: String
val out = in.toUpperCase
mediator ! Publish("content", out)
}
}
//#publisher
//#subscriber
class Subscriber extends Actor with ActorLogging {
import DistributedPubSubMediator.{ Subscribe, SubscribeAck }
val mediator = DistributedPubSubExtension(context.system).mediator
// subscribe to the topic named "content"
mediator ! Subscribe("content", self)
def receive = {
case SubscribeAck(Subscribe("content", `self`))
context become ready
}
def ready: Actor.Receive = {
case s: String
log.info("Got {}", s)
}
}
//#subscriber
}
class DistributedPubSubMediatorMultiJvmNode1 extends DistributedPubSubMediatorSpec
class DistributedPubSubMediatorMultiJvmNode2 extends DistributedPubSubMediatorSpec
class DistributedPubSubMediatorMultiJvmNode3 extends DistributedPubSubMediatorSpec
class DistributedPubSubMediatorSpec extends MultiNodeSpec(DistributedPubSubMediatorSpec) with STMultiNodeSpec with ImplicitSender {
import DistributedPubSubMediatorSpec._
import DistributedPubSubMediatorSpec.TestChatUser._
import DistributedPubSubMediator._
override def initialParticipants = roles.size
def join(from: RoleName, to: RoleName): Unit = {
runOn(from) {
Cluster(system) join node(to).address
createMediator()
}
enterBarrier(from.name + "-joined")
}
def createMediator(): ActorRef = DistributedPubSubExtension(system).mediator
def mediator: ActorRef = DistributedPubSubExtension(system).mediator
var chatUsers: Map[String, ActorRef] = Map.empty
def createChatUser(name: String): ActorRef = {
var a = system.actorOf(Props(new TestChatUser(mediator, testActor)), name)
chatUsers += (name -> a)
a
}
def chatUser(name: String): ActorRef = chatUsers(name)
def awaitCount(expected: Int): Unit = {
awaitAssert {
mediator ! Count
expectMsgType[Int] must be(expected)
}
}
"A DistributedPubSubMediator" must {
"startup 2 node cluster" in within(15 seconds) {
join(first, first)
join(second, first)
enterBarrier("after-1")
}
"keep track of added users" in within(15 seconds) {
runOn(first) {
val u1 = createChatUser("u1")
mediator ! Put(u1)
val u2 = createChatUser("u2")
mediator ! Put(u2)
awaitCount(2)
// send to actor at same node
u1 ! Whisper("/user/u2", "hello")
expectMsg("hello")
lastSender must be(u2)
}
runOn(second) {
val u3 = createChatUser("u3")
mediator ! Put(u3)
}
runOn(first, second) {
awaitCount(3)
}
enterBarrier("3-registered")
runOn(second) {
val u4 = createChatUser("u4")
mediator ! Put(u4)
}
runOn(first, second) {
awaitCount(4)
}
enterBarrier("4-registered")
runOn(first) {
// send to actor on another node
chatUser("u1") ! Whisper("/user/u4", "hi there")
}
runOn(second) {
expectMsg("hi there")
lastSender.path.name must be("u4")
}
enterBarrier("after-2")
}
"replicate users to new node" in within(20 seconds) {
join(third, first)
runOn(third) {
val u5 = createChatUser("u5")
mediator ! Put(u5)
}
awaitCount(5)
enterBarrier("5-registered")
runOn(third) {
chatUser("u5") ! Whisper("/user/u4", "go")
}
runOn(second) {
expectMsg("go")
lastSender.path.name must be("u4")
}
enterBarrier("after-3")
}
"keep track of removed users" in within(15 seconds) {
runOn(first) {
val u6 = createChatUser("u6")
mediator ! Put(u6)
}
awaitCount(6)
enterBarrier("6-registered")
runOn(first) {
mediator ! Remove("/user/u6")
}
awaitCount(5)
enterBarrier("after-4")
}
"remove terminated users" in within(5 seconds) {
runOn(second) {
chatUser("u3") ! PoisonPill
}
awaitCount(4)
enterBarrier("after-5")
}
"publish" in within(15 seconds) {
runOn(first, second) {
val u7 = createChatUser("u7")
mediator ! Put(u7)
}
awaitCount(6)
enterBarrier("7-registered")
runOn(third) {
chatUser("u5") ! Talk("/user/u7", "hi")
}
runOn(first, second) {
expectMsg("hi")
lastSender.path.name must be("u7")
}
runOn(third) {
expectNoMsg(2.seconds)
}
enterBarrier("after-6")
}
"publish to topic" in within(15 seconds) {
runOn(first) {
val s8 = Subscribe("topic1", createChatUser("u8"))
mediator ! s8
expectMsg(SubscribeAck(s8))
val s9 = Subscribe("topic1", createChatUser("u9"))
mediator ! s9
expectMsg(SubscribeAck(s9))
}
runOn(second) {
val s10 = Subscribe("topic1", createChatUser("u10"))
mediator ! s10
expectMsg(SubscribeAck(s10))
}
// one topic on two nodes
awaitCount(8)
enterBarrier("topic1-registered")
runOn(third) {
chatUser("u5") ! Shout("topic1", "hello all")
}
runOn(first) {
val names = receiveWhile(messages = 2) {
case "hello all" lastSender.path.name
}
names.toSet must be(Set("u8", "u9"))
}
runOn(second) {
expectMsg("hello all")
lastSender.path.name must be("u10")
}
runOn(third) {
expectNoMsg(2.seconds)
}
enterBarrier("after-7")
}
"demonstrate usage" in within(15 seconds) {
def later(): Unit = {
awaitCount(10)
}
//#start-subscribers
runOn(first) {
system.actorOf(Props[Subscriber], "subscriber1")
}
runOn(second) {
system.actorOf(Props[Subscriber], "subscriber2")
system.actorOf(Props[Subscriber], "subscriber3")
}
//#start-subscribers
//#publish-message
runOn(third) {
val publisher = system.actorOf(Props[Publisher], "publisher")
later()
// after a while the subscriptions are replicated
publisher ! "hello"
}
//#publish-message
enterBarrier("after-8")
}
}
}

View file

@ -0,0 +1,92 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.contrib.pattern;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
public class DistributedPubSubMediatorTest {
private static ActorSystem system;
@BeforeClass
public static void setup() {
system = ActorSystem.create();
}
@AfterClass
public static void teardown() {
system.shutdown();
}
@Test
public void demonstrateUsage() {
//#start-subscribers
system.actorOf(new Props(Subscriber.class), "subscriber1");
//another node
system.actorOf(new Props(Subscriber.class), "subscriber2");
system.actorOf(new Props(Subscriber.class), "subscriber3");
//#start-subscribers
//#publish-message
//somewhere else
ActorRef publisher = system.actorOf(new Props(Publisher.class), "publisher");
// after a while the subscriptions are replicated
publisher.tell("hello", null);
//#publish-message
}
static//#subscriber
public class Subscriber extends UntypedActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
public Subscriber() {
ActorRef mediator =
DistributedPubSubExtension.get(getContext().system()).mediator();
// subscribe to the topic named "content"
mediator.tell(new DistributedPubSubMediator.Subscribe("content", getSelf()),
getSelf());
}
public void onReceive(Object msg) {
if (msg instanceof String)
log.info("Got: {}", msg);
else if (msg instanceof DistributedPubSubMediator.SubscribeAck)
log.info("subscribing");
else
unhandled(msg);
}
}
//#subscriber
static//#publisher
public class Publisher extends UntypedActor {
// activate the extension
ActorRef mediator =
DistributedPubSubExtension.get(getContext().system()).mediator();
public void onReceive(Object msg) {
if (msg instanceof String) {
String in = (String) msg;
String out = in.toUpperCase();
mediator.tell(new DistributedPubSubMediator.Publish("content", out),
getSelf());
} else {
unhandled(msg);
}
}
}
//#publisher
}

View file

@ -269,6 +269,11 @@ events, but there are several corner cases to consider. Therefore, this specific
case is made easily accessible by the :ref:`cluster-singleton` in the contrib module.
You can use it as is, or adjust to fit your specific needs.
Distributed Publish Subscribe Pattern
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
See :ref:`distributed-pub-sub` in the contrib module.
Failure Detector
^^^^^^^^^^^^^^^^

View file

@ -257,6 +257,11 @@ events, but there are several corner cases to consider. Therefore, this specific
case is made easily accessible by the :ref:`cluster-singleton` in the contrib module.
You can use it as is, or adjust to fit your specific needs.
Distributed Publish Subscribe Pattern
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
See :ref:`distributed-pub-sub` in the contrib module.
Failure Detector
^^^^^^^^^^^^^^^^