From f421e4260b7871799eded8d0e5d671d3aa5cc109 Mon Sep 17 00:00:00 2001 From: Xingrun CHEN Date: Fri, 4 Apr 2014 12:10:41 +0800 Subject: [PATCH] +con #3972 Make Distributedpubsubmediator support consumer group 1. allow Topic have children topics, which are the groups 2. when publish with sendOneMessageToEachGroup flag, it will send to one actor each group --- akka-contrib/docs/distributed-pub-sub.rst | 13 +- .../pattern/DistributedPubSubMediator.scala | 173 +++++++++++++++--- .../DistributedPubSubMessageSerializer.scala | 3 +- .../DistributedPubSubMediatorSpec.scala | 76 ++++++-- ...stributedPubSubMessageSerializerSpec.scala | 7 +- 5 files changed, 227 insertions(+), 45 deletions(-) diff --git a/akka-contrib/docs/distributed-pub-sub.rst b/akka-contrib/docs/distributed-pub-sub.rst index 001ae78747..34c0adbfb3 100644 --- a/akka-contrib/docs/distributed-pub-sub.rst +++ b/akka-contrib/docs/distributed-pub-sub.rst @@ -23,7 +23,7 @@ immediately visible at other nodes, but typically they will be fully replicated to all other nodes after a few seconds. You can send messages via the mediator on any node to registered actors on -any other node. There is three modes of message delivery. +any other node. There is four modes of message delivery. **1. DistributedPubSubMediator.Send** @@ -56,6 +56,17 @@ and then delivered to all subscribers of the local topic representation. This is true pub/sub mode. A typical usage of this mode is a chat room in an instant messaging application. +**4. DistributedPubSubMediator.Publish with sendOneMessageToEachGroup** + +Actors may be subscribed to a named topic with an optional property (``group``). +If subscribing with a group name, each message published to a topic with the +(``sendOneMessageToEachGroup``) flag is delivered via the supplied ``RoutingLogic`` +(default random) to one actor within each subscribing group. +If all the subscribed actors have the same group name, then this works just like +``Send`` and all messages are delivered to one subscriber. +If all the subscribed actors have different group names, then this works like +normal ``Publish`` and all messages are broadcast to all subscribers. + You register actors to the local mediator with ``DistributedPubSubMediator.Put`` or ``DistributedPubSubMediator.Subscribe``. ``Put`` is used together with ``Send`` and ``SendToAll`` message delivery modes. The ``ActorRef`` in ``Put`` must belong to the same diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala index 459bfc2743..27b161cce8 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala @@ -32,6 +32,7 @@ import akka.routing.Router import akka.routing.RoundRobinRoutingLogic import akka.routing.ConsistentHashingRoutingLogic import akka.routing.BroadcastRoutingLogic +import scala.collection.immutable.TreeMap object DistributedPubSubMediator { @@ -65,15 +66,37 @@ object DistributedPubSubMediator { @SerialVersionUID(1L) final case class Put(ref: ActorRef) @SerialVersionUID(1L) final case class Remove(path: String) - @SerialVersionUID(1L) final case class Subscribe(topic: String, ref: ActorRef) { + @SerialVersionUID(1L) final case class Subscribe(topic: String, group: Option[String], ref: ActorRef) { require(topic != null && topic != "", "topic must be defined") + /** + * Convenience constructor with `group` None + */ + def this(topic: String, ref: ActorRef) = this(topic, None, ref) + + /** + * Java API: constructor with group: String + */ + def this(topic: String, group: String, ref: ActorRef) = this(topic, Some(group), ref) } - @SerialVersionUID(1L) final case class Unsubscribe(topic: String, ref: ActorRef) { + object Subscribe { + def apply(topic: String, ref: ActorRef) = new Subscribe(topic, ref) + } + @SerialVersionUID(1L) final case class Unsubscribe(topic: String, group: Option[String], ref: ActorRef) { require(topic != null && topic != "", "topic must be defined") + def this(topic: String, ref: ActorRef) = this(topic, None, ref) + def this(topic: String, group: String, ref: ActorRef) = this(topic, Some(group), ref) + } + object Unsubscribe { + def apply(topic: String, ref: ActorRef) = new Unsubscribe(topic, ref) } @SerialVersionUID(1L) final case class SubscribeAck(subscribe: Subscribe) @SerialVersionUID(1L) final case class UnsubscribeAck(unsubscribe: Unsubscribe) - @SerialVersionUID(1L) final case class Publish(topic: String, msg: Any) extends DistributedPubSubMessage + @SerialVersionUID(1L) final case class Publish(topic: String, msg: Any, sendOneMessageToEachGroup: Boolean) extends DistributedPubSubMessage { + def this(topic: String, msg: Any) = this(topic, msg, sendOneMessageToEachGroup = false) + } + object Publish { + def apply(topic: String, msg: Any) = new Publish(topic, msg) + } @SerialVersionUID(1L) final case class Send(path: String, msg: Any, localAffinity: Boolean) extends DistributedPubSubMessage { /** * Convenience constructor with `localAffinity` false @@ -97,7 +120,7 @@ object DistributedPubSubMediator { final case class Bucket( owner: Address, version: Long, - content: Map[String, ValueHolder]) + content: TreeMap[String, ValueHolder]) @SerialVersionUID(1L) final case class ValueHolder(version: Long, ref: Option[ActorRef]) { @@ -111,12 +134,23 @@ object DistributedPubSubMediator { case object GossipTick + @SerialVersionUID(1L) + final case class RegisterTopic(topicRef: ActorRef) + @SerialVersionUID(1L) + final case class Subscribed(ack: SubscribeAck, subscriber: ActorRef) + @SerialVersionUID(1L) + final case class Unsubscribed(ack: UnsubscribeAck, subscriber: ActorRef) + @SerialVersionUID(1L) + final case class SendToOneSubscriber(msg: Any) + def roleOption(role: String): Option[String] = role match { case null | "" ⇒ None case _ ⇒ Some(role) } - class Topic(emptyTimeToLive: FiniteDuration) extends Actor { + def encName(s: String) = URLEncoder.encode(s, "utf-8") + + trait TopicLike extends Actor { import context.dispatcher val pruneInterval: FiniteDuration = emptyTimeToLive / 2 val pruneTask = context.system.scheduler.schedule(pruneInterval, pruneInterval, self, Prune) @@ -124,21 +158,23 @@ object DistributedPubSubMediator { var subscribers = Set.empty[ActorRef] + val emptyTimeToLive: FiniteDuration + override def postStop(): Unit = { super.postStop() pruneTask.cancel() } - def receive = { - case msg @ Subscribe(_, ref) ⇒ + def defaultReceive: Receive = { + case msg @ Subscribe(_, _, ref) ⇒ context watch ref subscribers += ref pruneDeadline = None - sender().tell(SubscribeAck(msg), context.parent) - case msg @ Unsubscribe(_, ref) ⇒ + context.parent ! Subscribed(SubscribeAck(msg), sender()) + case msg @ Unsubscribe(_, _, ref) ⇒ context unwatch ref remove(ref) - sender().tell(UnsubscribeAck(msg), context.parent) + context.parent ! Unsubscribed(UnsubscribeAck(msg), sender()) case Terminated(ref) ⇒ remove(ref) case Prune ⇒ @@ -147,13 +183,49 @@ object DistributedPubSubMediator { subscribers foreach { _ forward msg } } - def remove(ref: ActorRef): Unit = - if (subscribers.contains(ref)) { - subscribers -= ref - if (subscribers.isEmpty) - pruneDeadline = Some(Deadline.now + emptyTimeToLive) - } + def business: Receive + def receive = business orElse defaultReceive + + def remove(ref: ActorRef): Unit = { + if (subscribers.contains(ref)) + subscribers -= ref + if (subscribers.isEmpty && context.children.isEmpty) + pruneDeadline = Some(Deadline.now + emptyTimeToLive) + } + } + + class Topic(val emptyTimeToLive: FiniteDuration, routingLogic: RoutingLogic) extends TopicLike { + def business = { + case msg @ Subscribe(_, Some(group), _) ⇒ + val encGroup = encName(group) + context.child(encGroup) match { + case Some(g) ⇒ g forward msg + case None ⇒ + val g = context.actorOf(Props(classOf[Group], emptyTimeToLive, routingLogic), name = encGroup) + g forward msg + context watch g + context.parent ! RegisterTopic(g) + } + pruneDeadline = None + case msg @ Unsubscribe(_, Some(group), _) ⇒ + context.child(encName(group)) match { + case Some(g) ⇒ g forward msg + case None ⇒ // no such group here + } + case msg: Subscribed ⇒ + context.parent forward msg + case msg: Unsubscribed ⇒ + context.parent forward msg + } + } + + class Group(val emptyTimeToLive: FiniteDuration, routingLogic: RoutingLogic) extends TopicLike { + def business = { + case SendToOneSubscriber(msg) ⇒ + if (subscribers.nonEmpty) + Router(routingLogic, (subscribers map ActorRefRoutee).toVector).route(msg, sender()) + } } } } @@ -208,6 +280,16 @@ trait DistributedPubSubMessage extends Serializable * true pub/sub mode. A typical usage of this mode is a chat room in an instant messaging * application. * + * 4. [[DistributedPubSubMediator.Publish]] with sendOneMessageToEachGroup - + * Actors may be subscribed to a named topic with an optional property `group`. + * If subscribing with a group name, each message published to a topic with the + * `sendOneMessageToEachGroup` flag is delivered via the supplied `routingLogic` + * (default random) to one actor within each subscribing group. + * If all the subscribed actors have the same group name, then this works just like + * [[DistributedPubSubMediator.Send]] and all messages are delivered to one subscribe. + * If all the subscribed actors have different group names, then this works like normal + * [[DistributedPubSubMediator.Publish]] and all messages are broadcast to all subscribers. + * * You register actors to the local mediator with [[DistributedPubSubMediator.Put]] or * [[DistributedPubSubMediator.Subscribe]]. `Put` is used together with `Send` and * `SendToAll` message delivery modes. The `ActorRef` in `Put` must belong to the same @@ -245,7 +327,7 @@ class DistributedPubSubMediator( val pruneInterval: FiniteDuration = removedTimeToLive / 2 val pruneTask = context.system.scheduler.schedule(pruneInterval, pruneInterval, self, Prune) - var registry: Map[Address, Bucket] = Map.empty.withDefault(a ⇒ Bucket(a, 0L, Map.empty)) + var registry: Map[Address, Bucket] = Map.empty.withDefault(a ⇒ Bucket(a, 0L, TreeMap.empty)) var nodes: Set[Address] = Set.empty // the version is a timestamp because it is also used when pruning removed entries @@ -293,8 +375,11 @@ class DistributedPubSubMediator( case SendToAll(path, msg, skipSenderNode) ⇒ publish(path, msg, skipSenderNode) - case Publish(topic, msg) ⇒ - publish(mkKey(self.path / URLEncoder.encode(topic, "utf-8")), msg) + case Publish(topic, msg, sendOneMessageToEachGroup) ⇒ + if (sendOneMessageToEachGroup) + publishToEachGroup(mkKey(self.path / encName(topic)), msg) + else + publish(mkKey(self.path / encName(topic)), msg) case Put(ref: ActorRef) ⇒ if (ref.path.address.hasGlobalScope) @@ -312,24 +397,32 @@ class DistributedPubSubMediator( case _ ⇒ } - case msg @ Subscribe(topic, _) ⇒ + case msg @ Subscribe(topic, _, _) ⇒ // each topic is managed by a child actor with the same name as the topic - val encTopic = URLEncoder.encode(topic, "utf-8") + val encTopic = encName(topic) context.child(encTopic) match { case Some(t) ⇒ t forward msg case None ⇒ - val t = context.actorOf(Props(classOf[Topic], removedTimeToLive), name = encTopic) + val t = context.actorOf(Props(classOf[Topic], removedTimeToLive, routingLogic), name = encTopic) t forward msg - put(mkKey(t), Some(t)) - context.watch(t) + registerTopic(t) } - case msg @ Unsubscribe(topic, _) ⇒ - context.child(URLEncoder.encode(topic, "utf-8")) match { - case Some(g) ⇒ g forward msg + case msg @ RegisterTopic(t) ⇒ + registerTopic(t) + + case msg @ Subscribed(ack, ref) ⇒ + ref ! ack + + case msg @ Unsubscribe(topic, _, _) ⇒ + context.child(encName(topic)) match { + case Some(t) ⇒ t forward msg case None ⇒ // no such topic here } + case msg @ Unsubscribed(ack, ref) ⇒ + ref ! ack + case Status(otherVersions) ⇒ // gossip chat starts with a Status message, containing the bucket versions of the other node val delta = collectDelta(otherVersions) @@ -404,6 +497,25 @@ class DistributedPubSubMediator( } ref forward msg } + def publishToEachGroup(path: String, msg: Any): Unit = { + val prefix = path + '/' + val lastKey = path + '0' // '0' is the next char of '/' + val groups = (for { + (_, bucket) ← registry.toSeq + key ← bucket.content.range(prefix, lastKey).keys + valueHolder ← bucket.content.get(key) + ref ← valueHolder.routee + } yield (key, ref)).groupBy(_._1).values + + val wrappedMsg = SendToOneSubscriber(msg) + groups foreach { + group ⇒ + val routees = group.map(_._2).toVector + if (routees.nonEmpty) + Router(routingLogic, routees).route(wrappedMsg, sender()) + } + } + def put(key: String, valueOption: Option[ActorRef]): Unit = { val bucket = registry(selfAddress) val v = nextVersion() @@ -411,6 +523,11 @@ class DistributedPubSubMediator( content = bucket.content + (key -> ValueHolder(v, valueOption)))) } + def registerTopic(ref: ActorRef): Unit = { + put(mkKey(ref), Some(ref)) + context.watch(ref) + } + def mkKey(ref: ActorRef): String = mkKey(ref.path) def mkKey(path: ActorPath): String = path.toStringWithoutAddress @@ -434,7 +551,7 @@ class DistributedPubSubMediator( // exceeded the maxDeltaElements, pick the elements with lowest versions val sortedContent = deltaContent.toVector.sortBy(_._2.version) val chunk = sortedContent.take(maxDeltaElements - (count - sortedContent.size)) - bucket.copy(content = chunk.toMap, version = chunk.last._2.version) + bucket.copy(content = TreeMap.empty[String, ValueHolder] ++ chunk, version = chunk.last._2.version) } } } diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/protobuf/DistributedPubSubMessageSerializer.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/protobuf/DistributedPubSubMessageSerializer.scala index 4b11a3e66f..fa9c43d0e4 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/protobuf/DistributedPubSubMessageSerializer.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/protobuf/DistributedPubSubMessageSerializer.scala @@ -26,6 +26,7 @@ import akka.contrib.pattern.DistributedPubSubMediator.Internal._ import akka.serialization.Serialization import akka.actor.ActorRef import akka.serialization.SerializationExtension +import scala.collection.immutable.TreeMap /** * Protobuf serializer of DistributedPubSubMediator messages. @@ -137,7 +138,7 @@ class DistributedPubSubMessageSerializer(val system: ExtendedActorSystem) extend private def deltaFromProto(delta: dm.Delta): Delta = Delta(delta.getBucketsList.asScala.toVector.map { b ⇒ - val content: Map[String, ValueHolder] = b.getContentList.asScala.map { entry ⇒ + val content: TreeMap[String, ValueHolder] = b.getContentList.asScala.map { entry ⇒ entry.getKey -> ValueHolder(entry.getVersion, if (entry.hasRef) Some(resolveActorRef(entry.getRef)) else None) }(breakOut) Bucket(addressFromProto(b.getOwner), b.getVersion, content) diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/DistributedPubSubMediatorSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/DistributedPubSubMediatorSpec.scala index e2170067f7..b941ac94ac 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/DistributedPubSubMediatorSpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/DistributedPubSubMediatorSpec.scala @@ -39,6 +39,9 @@ object DistributedPubSubMediatorSpec extends MultiNodeConfig { final case class Talk(path: String, msg: Any) final case class TalkToOthers(path: String, msg: Any) final case class Shout(topic: String, msg: Any) + final case class ShoutToGroups(topic: String, msg: Any) + final case class JoinGroup(topic: String, group: String) + final case class ExitGroup(topic: String, group: String) } class TestChatUser(mediator: ActorRef, testActor: ActorRef) extends Actor { @@ -46,11 +49,14 @@ object DistributedPubSubMediatorSpec extends MultiNodeConfig { import DistributedPubSubMediator._ def receive = { - case Whisper(path, msg) ⇒ mediator ! Send(path, msg, localAffinity = true) - case Talk(path, msg) ⇒ mediator ! SendToAll(path, msg) - case TalkToOthers(path, msg) ⇒ mediator ! SendToAll(path, msg, allButSelf = true) - case Shout(topic, msg) ⇒ mediator ! Publish(topic, msg) - case msg ⇒ testActor ! msg + case Whisper(path, msg) ⇒ mediator ! Send(path, msg, localAffinity = true) + case Talk(path, msg) ⇒ mediator ! SendToAll(path, msg) + case TalkToOthers(path, msg) ⇒ mediator ! SendToAll(path, msg, allButSelf = true) + case Shout(topic, msg) ⇒ mediator ! Publish(topic, msg) + case ShoutToGroups(topic, msg) ⇒ mediator ! Publish(topic, msg, true) + case JoinGroup(topic, group) ⇒ mediator ! Subscribe(topic, Some(group), self) + case ExitGroup(topic, group) ⇒ mediator ! Unsubscribe(topic, Some(group), self) + case msg ⇒ testActor ! msg } } @@ -76,7 +82,7 @@ object DistributedPubSubMediatorSpec extends MultiNodeConfig { mediator ! Subscribe("content", self) def receive = { - case SubscribeAck(Subscribe("content", `self`)) ⇒ + case SubscribeAck(Subscribe("content", None, `self`)) ⇒ context become ready } @@ -345,6 +351,52 @@ class DistributedPubSubMediatorSpec extends MultiNodeSpec(DistributedPubSubMedia enterBarrier("after-11") } + "send one message to each group" in within(20 seconds) { + runOn(first) { + val u12 = createChatUser("u12") + u12 ! JoinGroup("topic2", "group1") + expectMsg(SubscribeAck(Subscribe("topic2", Some("group1"), u12))) + } + runOn(second) { + val u12 = createChatUser("u12") + u12 ! JoinGroup("topic2", "group2") + expectMsg(SubscribeAck(Subscribe("topic2", Some("group2"), u12))) + + val u13 = createChatUser("u13") + u13 ! JoinGroup("topic2", "group2") + expectMsg(SubscribeAck(Subscribe("topic2", Some("group2"), u13))) + } + awaitCount(17) + enterBarrier("12-registered") + + runOn(first) { + chatUser("u12") ! ShoutToGroups("topic2", "hi") + } + + runOn(first, second) { + expectMsg("hi") + expectNoMsg(2.seconds) // each group receive only one message + } + enterBarrier("12-published") + + runOn(first) { + val u12 = chatUser("u12") + u12 ! ExitGroup("topic2", "group1") + expectMsg(UnsubscribeAck(Unsubscribe("topic2", Some("group1"), u12))) + } + + runOn(second) { + val u12 = chatUser("u12") + u12 ! ExitGroup("topic2", "group2") + expectMsg(UnsubscribeAck(Unsubscribe("topic2", Some("group2"), u12))) + val u13 = chatUser("u13") + u13 ! ExitGroup("topic2", "group2") + expectMsg(UnsubscribeAck(Unsubscribe("topic2", Some("group2"), u13))) + } + enterBarrier("after-12") + + } + "transfer delta correctly" in { val firstAddress = node(first).address val secondAddress = node(second).address @@ -354,8 +406,8 @@ class DistributedPubSubMediatorSpec extends MultiNodeSpec(DistributedPubSubMedia mediator ! Status(versions = Map.empty) val deltaBuckets = expectMsgType[Delta].buckets deltaBuckets.size should be(3) - deltaBuckets.find(_.owner == firstAddress).get.content.size should be(7) - deltaBuckets.find(_.owner == secondAddress).get.content.size should be(6) + deltaBuckets.find(_.owner == firstAddress).get.content.size should be(9) + deltaBuckets.find(_.owner == secondAddress).get.content.size should be(8) deltaBuckets.find(_.owner == thirdAddress).get.content.size should be(2) } enterBarrier("verified-initial-delta") @@ -377,15 +429,15 @@ class DistributedPubSubMediatorSpec extends MultiNodeSpec(DistributedPubSubMedia mediator ! Status(versions = deltaBuckets2.map(b ⇒ b.owner -> b.version).toMap) val deltaBuckets3 = expectMsgType[Delta].buckets - deltaBuckets3.map(_.content.size).sum should be(7 + 6 + 2 + many - 500 - 500) + deltaBuckets3.map(_.content.size).sum should be(9 + 8 + 2 + many - 500 - 500) } enterBarrier("verified-delta-with-many") within(10.seconds) { - awaitCount(13 + many) + awaitCount(17 + many) } - enterBarrier("after-12") + enterBarrier("after-13") } "remove entries when node is removed" in within(30 seconds) { @@ -403,7 +455,7 @@ class DistributedPubSubMediatorSpec extends MultiNodeSpec(DistributedPubSubMedia awaitCount(countBefore - 2) } - enterBarrier("after-13") + enterBarrier("after-14") } "receive proper unsubscribeAck message" in within(15 seconds) { diff --git a/akka-contrib/src/test/scala/akka/contrib/pattern/protobuf/DistributedPubSubMessageSerializerSpec.scala b/akka-contrib/src/test/scala/akka/contrib/pattern/protobuf/DistributedPubSubMessageSerializerSpec.scala index 638fe57c6d..a690d5f7ca 100644 --- a/akka-contrib/src/test/scala/akka/contrib/pattern/protobuf/DistributedPubSubMessageSerializerSpec.scala +++ b/akka-contrib/src/test/scala/akka/contrib/pattern/protobuf/DistributedPubSubMessageSerializerSpec.scala @@ -8,6 +8,7 @@ import akka.testkit.AkkaSpec import akka.contrib.pattern.DistributedPubSubMediator._ import akka.contrib.pattern.DistributedPubSubMediator.Internal._ import akka.actor.Props +import scala.collection.immutable.TreeMap @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class DistributedPubSubMessageSerializerSpec extends AkkaSpec { @@ -32,9 +33,9 @@ class DistributedPubSubMessageSerializerSpec extends AkkaSpec { val u4 = system.actorOf(Props.empty, "u4") checkSerialization(Status(Map(address1 -> 3, address2 -> 17, address3 -> 5))) checkSerialization(Delta(List( - Bucket(address1, 3, Map("/user/u1" -> ValueHolder(2, Some(u1)), "/user/u2" -> ValueHolder(3, Some(u2)))), - Bucket(address2, 17, Map("/user/u3" -> ValueHolder(17, Some(u3)))), - Bucket(address3, 5, Map("/user/u4" -> ValueHolder(4, Some(u4)), "/user/u5" -> ValueHolder(5, None)))))) + Bucket(address1, 3, TreeMap("/user/u1" -> ValueHolder(2, Some(u1)), "/user/u2" -> ValueHolder(3, Some(u2)))), + Bucket(address2, 17, TreeMap("/user/u3" -> ValueHolder(17, Some(u3)))), + Bucket(address3, 5, TreeMap("/user/u4" -> ValueHolder(4, Some(u4)), "/user/u5" -> ValueHolder(5, None)))))) checkSerialization(Send("/user/u3", "hello", localAffinity = true)) checkSerialization(SendToAll("/user/u3", "hello", allButSelf = true)) checkSerialization(Publish("mytopic", "hello"))