Merge pull request #2115 from cowboy129/master

+con #3972 Make Distributedpubsubmediator support consumer group
This commit is contained in:
Patrik Nordwall 2014-04-16 11:00:30 +02:00
commit 2ff0f1d004
5 changed files with 227 additions and 45 deletions

View file

@ -23,7 +23,7 @@ immediately visible at other nodes, but typically they will be fully replicated
to all other nodes after a few seconds.
You can send messages via the mediator on any node to registered actors on
any other node. There is three modes of message delivery.
any other node. There is four modes of message delivery.
**1. DistributedPubSubMediator.Send**
@ -56,6 +56,17 @@ and then delivered to all subscribers of the local topic representation. This is
true pub/sub mode. A typical usage of this mode is a chat room in an instant messaging
application.
**4. DistributedPubSubMediator.Publish with sendOneMessageToEachGroup**
Actors may be subscribed to a named topic with an optional property (``group``).
If subscribing with a group name, each message published to a topic with the
(``sendOneMessageToEachGroup``) flag is delivered via the supplied ``RoutingLogic``
(default random) to one actor within each subscribing group.
If all the subscribed actors have the same group name, then this works just like
``Send`` and all messages are delivered to one subscriber.
If all the subscribed actors have different group names, then this works like
normal ``Publish`` and all messages are broadcast to all subscribers.
You register actors to the local mediator with ``DistributedPubSubMediator.Put`` or
``DistributedPubSubMediator.Subscribe``. ``Put`` is used together with ``Send`` and
``SendToAll`` message delivery modes. The ``ActorRef`` in ``Put`` must belong to the same

View file

@ -32,6 +32,7 @@ import akka.routing.Router
import akka.routing.RoundRobinRoutingLogic
import akka.routing.ConsistentHashingRoutingLogic
import akka.routing.BroadcastRoutingLogic
import scala.collection.immutable.TreeMap
object DistributedPubSubMediator {
@ -65,15 +66,37 @@ object DistributedPubSubMediator {
@SerialVersionUID(1L) final case class Put(ref: ActorRef)
@SerialVersionUID(1L) final case class Remove(path: String)
@SerialVersionUID(1L) final case class Subscribe(topic: String, ref: ActorRef) {
@SerialVersionUID(1L) final case class Subscribe(topic: String, group: Option[String], ref: ActorRef) {
require(topic != null && topic != "", "topic must be defined")
/**
* Convenience constructor with `group` None
*/
def this(topic: String, ref: ActorRef) = this(topic, None, ref)
/**
* Java API: constructor with group: String
*/
def this(topic: String, group: String, ref: ActorRef) = this(topic, Some(group), ref)
}
@SerialVersionUID(1L) final case class Unsubscribe(topic: String, ref: ActorRef) {
object Subscribe {
def apply(topic: String, ref: ActorRef) = new Subscribe(topic, ref)
}
@SerialVersionUID(1L) final case class Unsubscribe(topic: String, group: Option[String], ref: ActorRef) {
require(topic != null && topic != "", "topic must be defined")
def this(topic: String, ref: ActorRef) = this(topic, None, ref)
def this(topic: String, group: String, ref: ActorRef) = this(topic, Some(group), ref)
}
object Unsubscribe {
def apply(topic: String, ref: ActorRef) = new Unsubscribe(topic, ref)
}
@SerialVersionUID(1L) final case class SubscribeAck(subscribe: Subscribe)
@SerialVersionUID(1L) final case class UnsubscribeAck(unsubscribe: Unsubscribe)
@SerialVersionUID(1L) final case class Publish(topic: String, msg: Any) extends DistributedPubSubMessage
@SerialVersionUID(1L) final case class Publish(topic: String, msg: Any, sendOneMessageToEachGroup: Boolean) extends DistributedPubSubMessage {
def this(topic: String, msg: Any) = this(topic, msg, sendOneMessageToEachGroup = false)
}
object Publish {
def apply(topic: String, msg: Any) = new Publish(topic, msg)
}
@SerialVersionUID(1L) final case class Send(path: String, msg: Any, localAffinity: Boolean) extends DistributedPubSubMessage {
/**
* Convenience constructor with `localAffinity` false
@ -97,7 +120,7 @@ object DistributedPubSubMediator {
final case class Bucket(
owner: Address,
version: Long,
content: Map[String, ValueHolder])
content: TreeMap[String, ValueHolder])
@SerialVersionUID(1L)
final case class ValueHolder(version: Long, ref: Option[ActorRef]) {
@ -111,12 +134,23 @@ object DistributedPubSubMediator {
case object GossipTick
@SerialVersionUID(1L)
final case class RegisterTopic(topicRef: ActorRef)
@SerialVersionUID(1L)
final case class Subscribed(ack: SubscribeAck, subscriber: ActorRef)
@SerialVersionUID(1L)
final case class Unsubscribed(ack: UnsubscribeAck, subscriber: ActorRef)
@SerialVersionUID(1L)
final case class SendToOneSubscriber(msg: Any)
def roleOption(role: String): Option[String] = role match {
case null | "" None
case _ Some(role)
}
class Topic(emptyTimeToLive: FiniteDuration) extends Actor {
def encName(s: String) = URLEncoder.encode(s, "utf-8")
trait TopicLike extends Actor {
import context.dispatcher
val pruneInterval: FiniteDuration = emptyTimeToLive / 2
val pruneTask = context.system.scheduler.schedule(pruneInterval, pruneInterval, self, Prune)
@ -124,21 +158,23 @@ object DistributedPubSubMediator {
var subscribers = Set.empty[ActorRef]
val emptyTimeToLive: FiniteDuration
override def postStop(): Unit = {
super.postStop()
pruneTask.cancel()
}
def receive = {
case msg @ Subscribe(_, ref)
def defaultReceive: Receive = {
case msg @ Subscribe(_, _, ref)
context watch ref
subscribers += ref
pruneDeadline = None
sender().tell(SubscribeAck(msg), context.parent)
case msg @ Unsubscribe(_, ref)
context.parent ! Subscribed(SubscribeAck(msg), sender())
case msg @ Unsubscribe(_, _, ref)
context unwatch ref
remove(ref)
sender().tell(UnsubscribeAck(msg), context.parent)
context.parent ! Unsubscribed(UnsubscribeAck(msg), sender())
case Terminated(ref)
remove(ref)
case Prune
@ -147,13 +183,49 @@ object DistributedPubSubMediator {
subscribers foreach { _ forward msg }
}
def remove(ref: ActorRef): Unit =
if (subscribers.contains(ref)) {
subscribers -= ref
if (subscribers.isEmpty)
pruneDeadline = Some(Deadline.now + emptyTimeToLive)
}
def business: Receive
def receive = business orElse defaultReceive
def remove(ref: ActorRef): Unit = {
if (subscribers.contains(ref))
subscribers -= ref
if (subscribers.isEmpty && context.children.isEmpty)
pruneDeadline = Some(Deadline.now + emptyTimeToLive)
}
}
class Topic(val emptyTimeToLive: FiniteDuration, routingLogic: RoutingLogic) extends TopicLike {
def business = {
case msg @ Subscribe(_, Some(group), _)
val encGroup = encName(group)
context.child(encGroup) match {
case Some(g) g forward msg
case None
val g = context.actorOf(Props(classOf[Group], emptyTimeToLive, routingLogic), name = encGroup)
g forward msg
context watch g
context.parent ! RegisterTopic(g)
}
pruneDeadline = None
case msg @ Unsubscribe(_, Some(group), _)
context.child(encName(group)) match {
case Some(g) g forward msg
case None // no such group here
}
case msg: Subscribed
context.parent forward msg
case msg: Unsubscribed
context.parent forward msg
}
}
class Group(val emptyTimeToLive: FiniteDuration, routingLogic: RoutingLogic) extends TopicLike {
def business = {
case SendToOneSubscriber(msg)
if (subscribers.nonEmpty)
Router(routingLogic, (subscribers map ActorRefRoutee).toVector).route(msg, sender())
}
}
}
}
@ -208,6 +280,16 @@ trait DistributedPubSubMessage extends Serializable
* true pub/sub mode. A typical usage of this mode is a chat room in an instant messaging
* application.
*
* 4. [[DistributedPubSubMediator.Publish]] with sendOneMessageToEachGroup -
* Actors may be subscribed to a named topic with an optional property `group`.
* If subscribing with a group name, each message published to a topic with the
* `sendOneMessageToEachGroup` flag is delivered via the supplied `routingLogic`
* (default random) to one actor within each subscribing group.
* If all the subscribed actors have the same group name, then this works just like
* [[DistributedPubSubMediator.Send]] and all messages are delivered to one subscribe.
* If all the subscribed actors have different group names, then this works like normal
* [[DistributedPubSubMediator.Publish]] and all messages are broadcast to all subscribers.
*
* You register actors to the local mediator with [[DistributedPubSubMediator.Put]] or
* [[DistributedPubSubMediator.Subscribe]]. `Put` is used together with `Send` and
* `SendToAll` message delivery modes. The `ActorRef` in `Put` must belong to the same
@ -245,7 +327,7 @@ class DistributedPubSubMediator(
val pruneInterval: FiniteDuration = removedTimeToLive / 2
val pruneTask = context.system.scheduler.schedule(pruneInterval, pruneInterval, self, Prune)
var registry: Map[Address, Bucket] = Map.empty.withDefault(a Bucket(a, 0L, Map.empty))
var registry: Map[Address, Bucket] = Map.empty.withDefault(a Bucket(a, 0L, TreeMap.empty))
var nodes: Set[Address] = Set.empty
// the version is a timestamp because it is also used when pruning removed entries
@ -293,8 +375,11 @@ class DistributedPubSubMediator(
case SendToAll(path, msg, skipSenderNode)
publish(path, msg, skipSenderNode)
case Publish(topic, msg)
publish(mkKey(self.path / URLEncoder.encode(topic, "utf-8")), msg)
case Publish(topic, msg, sendOneMessageToEachGroup)
if (sendOneMessageToEachGroup)
publishToEachGroup(mkKey(self.path / encName(topic)), msg)
else
publish(mkKey(self.path / encName(topic)), msg)
case Put(ref: ActorRef)
if (ref.path.address.hasGlobalScope)
@ -312,24 +397,32 @@ class DistributedPubSubMediator(
case _
}
case msg @ Subscribe(topic, _)
case msg @ Subscribe(topic, _, _)
// each topic is managed by a child actor with the same name as the topic
val encTopic = URLEncoder.encode(topic, "utf-8")
val encTopic = encName(topic)
context.child(encTopic) match {
case Some(t) t forward msg
case None
val t = context.actorOf(Props(classOf[Topic], removedTimeToLive), name = encTopic)
val t = context.actorOf(Props(classOf[Topic], removedTimeToLive, routingLogic), name = encTopic)
t forward msg
put(mkKey(t), Some(t))
context.watch(t)
registerTopic(t)
}
case msg @ Unsubscribe(topic, _)
context.child(URLEncoder.encode(topic, "utf-8")) match {
case Some(g) g forward msg
case msg @ RegisterTopic(t)
registerTopic(t)
case msg @ Subscribed(ack, ref)
ref ! ack
case msg @ Unsubscribe(topic, _, _)
context.child(encName(topic)) match {
case Some(t) t forward msg
case None // no such topic here
}
case msg @ Unsubscribed(ack, ref)
ref ! ack
case Status(otherVersions)
// gossip chat starts with a Status message, containing the bucket versions of the other node
val delta = collectDelta(otherVersions)
@ -404,6 +497,25 @@ class DistributedPubSubMediator(
} ref forward msg
}
def publishToEachGroup(path: String, msg: Any): Unit = {
val prefix = path + '/'
val lastKey = path + '0' // '0' is the next char of '/'
val groups = (for {
(_, bucket) registry.toSeq
key bucket.content.range(prefix, lastKey).keys
valueHolder bucket.content.get(key)
ref valueHolder.routee
} yield (key, ref)).groupBy(_._1).values
val wrappedMsg = SendToOneSubscriber(msg)
groups foreach {
group
val routees = group.map(_._2).toVector
if (routees.nonEmpty)
Router(routingLogic, routees).route(wrappedMsg, sender())
}
}
def put(key: String, valueOption: Option[ActorRef]): Unit = {
val bucket = registry(selfAddress)
val v = nextVersion()
@ -411,6 +523,11 @@ class DistributedPubSubMediator(
content = bucket.content + (key -> ValueHolder(v, valueOption))))
}
def registerTopic(ref: ActorRef): Unit = {
put(mkKey(ref), Some(ref))
context.watch(ref)
}
def mkKey(ref: ActorRef): String = mkKey(ref.path)
def mkKey(path: ActorPath): String = path.toStringWithoutAddress
@ -434,7 +551,7 @@ class DistributedPubSubMediator(
// exceeded the maxDeltaElements, pick the elements with lowest versions
val sortedContent = deltaContent.toVector.sortBy(_._2.version)
val chunk = sortedContent.take(maxDeltaElements - (count - sortedContent.size))
bucket.copy(content = chunk.toMap, version = chunk.last._2.version)
bucket.copy(content = TreeMap.empty[String, ValueHolder] ++ chunk, version = chunk.last._2.version)
}
}
}

View file

@ -26,6 +26,7 @@ import akka.contrib.pattern.DistributedPubSubMediator.Internal._
import akka.serialization.Serialization
import akka.actor.ActorRef
import akka.serialization.SerializationExtension
import scala.collection.immutable.TreeMap
/**
* Protobuf serializer of DistributedPubSubMediator messages.
@ -137,7 +138,7 @@ class DistributedPubSubMessageSerializer(val system: ExtendedActorSystem) extend
private def deltaFromProto(delta: dm.Delta): Delta =
Delta(delta.getBucketsList.asScala.toVector.map { b
val content: Map[String, ValueHolder] = b.getContentList.asScala.map { entry
val content: TreeMap[String, ValueHolder] = b.getContentList.asScala.map { entry
entry.getKey -> ValueHolder(entry.getVersion, if (entry.hasRef) Some(resolveActorRef(entry.getRef)) else None)
}(breakOut)
Bucket(addressFromProto(b.getOwner), b.getVersion, content)

View file

@ -39,6 +39,9 @@ object DistributedPubSubMediatorSpec extends MultiNodeConfig {
final case class Talk(path: String, msg: Any)
final case class TalkToOthers(path: String, msg: Any)
final case class Shout(topic: String, msg: Any)
final case class ShoutToGroups(topic: String, msg: Any)
final case class JoinGroup(topic: String, group: String)
final case class ExitGroup(topic: String, group: String)
}
class TestChatUser(mediator: ActorRef, testActor: ActorRef) extends Actor {
@ -46,11 +49,14 @@ object DistributedPubSubMediatorSpec extends MultiNodeConfig {
import DistributedPubSubMediator._
def receive = {
case Whisper(path, msg) mediator ! Send(path, msg, localAffinity = true)
case Talk(path, msg) mediator ! SendToAll(path, msg)
case TalkToOthers(path, msg) mediator ! SendToAll(path, msg, allButSelf = true)
case Shout(topic, msg) mediator ! Publish(topic, msg)
case msg testActor ! msg
case Whisper(path, msg) mediator ! Send(path, msg, localAffinity = true)
case Talk(path, msg) mediator ! SendToAll(path, msg)
case TalkToOthers(path, msg) mediator ! SendToAll(path, msg, allButSelf = true)
case Shout(topic, msg) mediator ! Publish(topic, msg)
case ShoutToGroups(topic, msg) mediator ! Publish(topic, msg, true)
case JoinGroup(topic, group) mediator ! Subscribe(topic, Some(group), self)
case ExitGroup(topic, group) mediator ! Unsubscribe(topic, Some(group), self)
case msg testActor ! msg
}
}
@ -76,7 +82,7 @@ object DistributedPubSubMediatorSpec extends MultiNodeConfig {
mediator ! Subscribe("content", self)
def receive = {
case SubscribeAck(Subscribe("content", `self`))
case SubscribeAck(Subscribe("content", None, `self`))
context become ready
}
@ -345,6 +351,52 @@ class DistributedPubSubMediatorSpec extends MultiNodeSpec(DistributedPubSubMedia
enterBarrier("after-11")
}
"send one message to each group" in within(20 seconds) {
runOn(first) {
val u12 = createChatUser("u12")
u12 ! JoinGroup("topic2", "group1")
expectMsg(SubscribeAck(Subscribe("topic2", Some("group1"), u12)))
}
runOn(second) {
val u12 = createChatUser("u12")
u12 ! JoinGroup("topic2", "group2")
expectMsg(SubscribeAck(Subscribe("topic2", Some("group2"), u12)))
val u13 = createChatUser("u13")
u13 ! JoinGroup("topic2", "group2")
expectMsg(SubscribeAck(Subscribe("topic2", Some("group2"), u13)))
}
awaitCount(17)
enterBarrier("12-registered")
runOn(first) {
chatUser("u12") ! ShoutToGroups("topic2", "hi")
}
runOn(first, second) {
expectMsg("hi")
expectNoMsg(2.seconds) // each group receive only one message
}
enterBarrier("12-published")
runOn(first) {
val u12 = chatUser("u12")
u12 ! ExitGroup("topic2", "group1")
expectMsg(UnsubscribeAck(Unsubscribe("topic2", Some("group1"), u12)))
}
runOn(second) {
val u12 = chatUser("u12")
u12 ! ExitGroup("topic2", "group2")
expectMsg(UnsubscribeAck(Unsubscribe("topic2", Some("group2"), u12)))
val u13 = chatUser("u13")
u13 ! ExitGroup("topic2", "group2")
expectMsg(UnsubscribeAck(Unsubscribe("topic2", Some("group2"), u13)))
}
enterBarrier("after-12")
}
"transfer delta correctly" in {
val firstAddress = node(first).address
val secondAddress = node(second).address
@ -354,8 +406,8 @@ class DistributedPubSubMediatorSpec extends MultiNodeSpec(DistributedPubSubMedia
mediator ! Status(versions = Map.empty)
val deltaBuckets = expectMsgType[Delta].buckets
deltaBuckets.size should be(3)
deltaBuckets.find(_.owner == firstAddress).get.content.size should be(7)
deltaBuckets.find(_.owner == secondAddress).get.content.size should be(6)
deltaBuckets.find(_.owner == firstAddress).get.content.size should be(9)
deltaBuckets.find(_.owner == secondAddress).get.content.size should be(8)
deltaBuckets.find(_.owner == thirdAddress).get.content.size should be(2)
}
enterBarrier("verified-initial-delta")
@ -377,15 +429,15 @@ class DistributedPubSubMediatorSpec extends MultiNodeSpec(DistributedPubSubMedia
mediator ! Status(versions = deltaBuckets2.map(b b.owner -> b.version).toMap)
val deltaBuckets3 = expectMsgType[Delta].buckets
deltaBuckets3.map(_.content.size).sum should be(7 + 6 + 2 + many - 500 - 500)
deltaBuckets3.map(_.content.size).sum should be(9 + 8 + 2 + many - 500 - 500)
}
enterBarrier("verified-delta-with-many")
within(10.seconds) {
awaitCount(13 + many)
awaitCount(17 + many)
}
enterBarrier("after-12")
enterBarrier("after-13")
}
"remove entries when node is removed" in within(30 seconds) {
@ -403,7 +455,7 @@ class DistributedPubSubMediatorSpec extends MultiNodeSpec(DistributedPubSubMedia
awaitCount(countBefore - 2)
}
enterBarrier("after-13")
enterBarrier("after-14")
}
"receive proper unsubscribeAck message" in within(15 seconds) {

View file

@ -8,6 +8,7 @@ import akka.testkit.AkkaSpec
import akka.contrib.pattern.DistributedPubSubMediator._
import akka.contrib.pattern.DistributedPubSubMediator.Internal._
import akka.actor.Props
import scala.collection.immutable.TreeMap
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class DistributedPubSubMessageSerializerSpec extends AkkaSpec {
@ -32,9 +33,9 @@ class DistributedPubSubMessageSerializerSpec extends AkkaSpec {
val u4 = system.actorOf(Props.empty, "u4")
checkSerialization(Status(Map(address1 -> 3, address2 -> 17, address3 -> 5)))
checkSerialization(Delta(List(
Bucket(address1, 3, Map("/user/u1" -> ValueHolder(2, Some(u1)), "/user/u2" -> ValueHolder(3, Some(u2)))),
Bucket(address2, 17, Map("/user/u3" -> ValueHolder(17, Some(u3)))),
Bucket(address3, 5, Map("/user/u4" -> ValueHolder(4, Some(u4)), "/user/u5" -> ValueHolder(5, None))))))
Bucket(address1, 3, TreeMap("/user/u1" -> ValueHolder(2, Some(u1)), "/user/u2" -> ValueHolder(3, Some(u2)))),
Bucket(address2, 17, TreeMap("/user/u3" -> ValueHolder(17, Some(u3)))),
Bucket(address3, 5, TreeMap("/user/u4" -> ValueHolder(4, Some(u4)), "/user/u5" -> ValueHolder(5, None))))))
checkSerialization(Send("/user/u3", "hello", localAffinity = true))
checkSerialization(SendToAll("/user/u3", "hello", allButSelf = true))
checkSerialization(Publish("mytopic", "hello"))