+con #3972 Make Distributedpubsubmediator support consumer group
1. allow Topic have children topics, which are the groups 2. when publish with sendOneMessageToEachGroup flag, it will send to one actor each group
This commit is contained in:
parent
5982aab066
commit
f421e4260b
5 changed files with 227 additions and 45 deletions
|
|
@ -23,7 +23,7 @@ immediately visible at other nodes, but typically they will be fully replicated
|
||||||
to all other nodes after a few seconds.
|
to all other nodes after a few seconds.
|
||||||
|
|
||||||
You can send messages via the mediator on any node to registered actors on
|
You can send messages via the mediator on any node to registered actors on
|
||||||
any other node. There is three modes of message delivery.
|
any other node. There is four modes of message delivery.
|
||||||
|
|
||||||
**1. DistributedPubSubMediator.Send**
|
**1. DistributedPubSubMediator.Send**
|
||||||
|
|
||||||
|
|
@ -56,6 +56,17 @@ and then delivered to all subscribers of the local topic representation. This is
|
||||||
true pub/sub mode. A typical usage of this mode is a chat room in an instant messaging
|
true pub/sub mode. A typical usage of this mode is a chat room in an instant messaging
|
||||||
application.
|
application.
|
||||||
|
|
||||||
|
**4. DistributedPubSubMediator.Publish with sendOneMessageToEachGroup**
|
||||||
|
|
||||||
|
Actors may be subscribed to a named topic with an optional property (``group``).
|
||||||
|
If subscribing with a group name, each message published to a topic with the
|
||||||
|
(``sendOneMessageToEachGroup``) flag is delivered via the supplied ``RoutingLogic``
|
||||||
|
(default random) to one actor within each subscribing group.
|
||||||
|
If all the subscribed actors have the same group name, then this works just like
|
||||||
|
``Send`` and all messages are delivered to one subscriber.
|
||||||
|
If all the subscribed actors have different group names, then this works like
|
||||||
|
normal ``Publish`` and all messages are broadcast to all subscribers.
|
||||||
|
|
||||||
You register actors to the local mediator with ``DistributedPubSubMediator.Put`` or
|
You register actors to the local mediator with ``DistributedPubSubMediator.Put`` or
|
||||||
``DistributedPubSubMediator.Subscribe``. ``Put`` is used together with ``Send`` and
|
``DistributedPubSubMediator.Subscribe``. ``Put`` is used together with ``Send`` and
|
||||||
``SendToAll`` message delivery modes. The ``ActorRef`` in ``Put`` must belong to the same
|
``SendToAll`` message delivery modes. The ``ActorRef`` in ``Put`` must belong to the same
|
||||||
|
|
|
||||||
|
|
@ -32,6 +32,7 @@ import akka.routing.Router
|
||||||
import akka.routing.RoundRobinRoutingLogic
|
import akka.routing.RoundRobinRoutingLogic
|
||||||
import akka.routing.ConsistentHashingRoutingLogic
|
import akka.routing.ConsistentHashingRoutingLogic
|
||||||
import akka.routing.BroadcastRoutingLogic
|
import akka.routing.BroadcastRoutingLogic
|
||||||
|
import scala.collection.immutable.TreeMap
|
||||||
|
|
||||||
object DistributedPubSubMediator {
|
object DistributedPubSubMediator {
|
||||||
|
|
||||||
|
|
@ -65,15 +66,37 @@ object DistributedPubSubMediator {
|
||||||
|
|
||||||
@SerialVersionUID(1L) final case class Put(ref: ActorRef)
|
@SerialVersionUID(1L) final case class Put(ref: ActorRef)
|
||||||
@SerialVersionUID(1L) final case class Remove(path: String)
|
@SerialVersionUID(1L) final case class Remove(path: String)
|
||||||
@SerialVersionUID(1L) final case class Subscribe(topic: String, ref: ActorRef) {
|
@SerialVersionUID(1L) final case class Subscribe(topic: String, group: Option[String], ref: ActorRef) {
|
||||||
require(topic != null && topic != "", "topic must be defined")
|
require(topic != null && topic != "", "topic must be defined")
|
||||||
|
/**
|
||||||
|
* Convenience constructor with `group` None
|
||||||
|
*/
|
||||||
|
def this(topic: String, ref: ActorRef) = this(topic, None, ref)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Java API: constructor with group: String
|
||||||
|
*/
|
||||||
|
def this(topic: String, group: String, ref: ActorRef) = this(topic, Some(group), ref)
|
||||||
}
|
}
|
||||||
@SerialVersionUID(1L) final case class Unsubscribe(topic: String, ref: ActorRef) {
|
object Subscribe {
|
||||||
|
def apply(topic: String, ref: ActorRef) = new Subscribe(topic, ref)
|
||||||
|
}
|
||||||
|
@SerialVersionUID(1L) final case class Unsubscribe(topic: String, group: Option[String], ref: ActorRef) {
|
||||||
require(topic != null && topic != "", "topic must be defined")
|
require(topic != null && topic != "", "topic must be defined")
|
||||||
|
def this(topic: String, ref: ActorRef) = this(topic, None, ref)
|
||||||
|
def this(topic: String, group: String, ref: ActorRef) = this(topic, Some(group), ref)
|
||||||
|
}
|
||||||
|
object Unsubscribe {
|
||||||
|
def apply(topic: String, ref: ActorRef) = new Unsubscribe(topic, ref)
|
||||||
}
|
}
|
||||||
@SerialVersionUID(1L) final case class SubscribeAck(subscribe: Subscribe)
|
@SerialVersionUID(1L) final case class SubscribeAck(subscribe: Subscribe)
|
||||||
@SerialVersionUID(1L) final case class UnsubscribeAck(unsubscribe: Unsubscribe)
|
@SerialVersionUID(1L) final case class UnsubscribeAck(unsubscribe: Unsubscribe)
|
||||||
@SerialVersionUID(1L) final case class Publish(topic: String, msg: Any) extends DistributedPubSubMessage
|
@SerialVersionUID(1L) final case class Publish(topic: String, msg: Any, sendOneMessageToEachGroup: Boolean) extends DistributedPubSubMessage {
|
||||||
|
def this(topic: String, msg: Any) = this(topic, msg, sendOneMessageToEachGroup = false)
|
||||||
|
}
|
||||||
|
object Publish {
|
||||||
|
def apply(topic: String, msg: Any) = new Publish(topic, msg)
|
||||||
|
}
|
||||||
@SerialVersionUID(1L) final case class Send(path: String, msg: Any, localAffinity: Boolean) extends DistributedPubSubMessage {
|
@SerialVersionUID(1L) final case class Send(path: String, msg: Any, localAffinity: Boolean) extends DistributedPubSubMessage {
|
||||||
/**
|
/**
|
||||||
* Convenience constructor with `localAffinity` false
|
* Convenience constructor with `localAffinity` false
|
||||||
|
|
@ -97,7 +120,7 @@ object DistributedPubSubMediator {
|
||||||
final case class Bucket(
|
final case class Bucket(
|
||||||
owner: Address,
|
owner: Address,
|
||||||
version: Long,
|
version: Long,
|
||||||
content: Map[String, ValueHolder])
|
content: TreeMap[String, ValueHolder])
|
||||||
|
|
||||||
@SerialVersionUID(1L)
|
@SerialVersionUID(1L)
|
||||||
final case class ValueHolder(version: Long, ref: Option[ActorRef]) {
|
final case class ValueHolder(version: Long, ref: Option[ActorRef]) {
|
||||||
|
|
@ -111,12 +134,23 @@ object DistributedPubSubMediator {
|
||||||
|
|
||||||
case object GossipTick
|
case object GossipTick
|
||||||
|
|
||||||
|
@SerialVersionUID(1L)
|
||||||
|
final case class RegisterTopic(topicRef: ActorRef)
|
||||||
|
@SerialVersionUID(1L)
|
||||||
|
final case class Subscribed(ack: SubscribeAck, subscriber: ActorRef)
|
||||||
|
@SerialVersionUID(1L)
|
||||||
|
final case class Unsubscribed(ack: UnsubscribeAck, subscriber: ActorRef)
|
||||||
|
@SerialVersionUID(1L)
|
||||||
|
final case class SendToOneSubscriber(msg: Any)
|
||||||
|
|
||||||
def roleOption(role: String): Option[String] = role match {
|
def roleOption(role: String): Option[String] = role match {
|
||||||
case null | "" ⇒ None
|
case null | "" ⇒ None
|
||||||
case _ ⇒ Some(role)
|
case _ ⇒ Some(role)
|
||||||
}
|
}
|
||||||
|
|
||||||
class Topic(emptyTimeToLive: FiniteDuration) extends Actor {
|
def encName(s: String) = URLEncoder.encode(s, "utf-8")
|
||||||
|
|
||||||
|
trait TopicLike extends Actor {
|
||||||
import context.dispatcher
|
import context.dispatcher
|
||||||
val pruneInterval: FiniteDuration = emptyTimeToLive / 2
|
val pruneInterval: FiniteDuration = emptyTimeToLive / 2
|
||||||
val pruneTask = context.system.scheduler.schedule(pruneInterval, pruneInterval, self, Prune)
|
val pruneTask = context.system.scheduler.schedule(pruneInterval, pruneInterval, self, Prune)
|
||||||
|
|
@ -124,21 +158,23 @@ object DistributedPubSubMediator {
|
||||||
|
|
||||||
var subscribers = Set.empty[ActorRef]
|
var subscribers = Set.empty[ActorRef]
|
||||||
|
|
||||||
|
val emptyTimeToLive: FiniteDuration
|
||||||
|
|
||||||
override def postStop(): Unit = {
|
override def postStop(): Unit = {
|
||||||
super.postStop()
|
super.postStop()
|
||||||
pruneTask.cancel()
|
pruneTask.cancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
def receive = {
|
def defaultReceive: Receive = {
|
||||||
case msg @ Subscribe(_, ref) ⇒
|
case msg @ Subscribe(_, _, ref) ⇒
|
||||||
context watch ref
|
context watch ref
|
||||||
subscribers += ref
|
subscribers += ref
|
||||||
pruneDeadline = None
|
pruneDeadline = None
|
||||||
sender().tell(SubscribeAck(msg), context.parent)
|
context.parent ! Subscribed(SubscribeAck(msg), sender())
|
||||||
case msg @ Unsubscribe(_, ref) ⇒
|
case msg @ Unsubscribe(_, _, ref) ⇒
|
||||||
context unwatch ref
|
context unwatch ref
|
||||||
remove(ref)
|
remove(ref)
|
||||||
sender().tell(UnsubscribeAck(msg), context.parent)
|
context.parent ! Unsubscribed(UnsubscribeAck(msg), sender())
|
||||||
case Terminated(ref) ⇒
|
case Terminated(ref) ⇒
|
||||||
remove(ref)
|
remove(ref)
|
||||||
case Prune ⇒
|
case Prune ⇒
|
||||||
|
|
@ -147,13 +183,49 @@ object DistributedPubSubMediator {
|
||||||
subscribers foreach { _ forward msg }
|
subscribers foreach { _ forward msg }
|
||||||
}
|
}
|
||||||
|
|
||||||
def remove(ref: ActorRef): Unit =
|
def business: Receive
|
||||||
if (subscribers.contains(ref)) {
|
|
||||||
subscribers -= ref
|
|
||||||
if (subscribers.isEmpty)
|
|
||||||
pruneDeadline = Some(Deadline.now + emptyTimeToLive)
|
|
||||||
}
|
|
||||||
|
|
||||||
|
def receive = business orElse defaultReceive
|
||||||
|
|
||||||
|
def remove(ref: ActorRef): Unit = {
|
||||||
|
if (subscribers.contains(ref))
|
||||||
|
subscribers -= ref
|
||||||
|
if (subscribers.isEmpty && context.children.isEmpty)
|
||||||
|
pruneDeadline = Some(Deadline.now + emptyTimeToLive)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class Topic(val emptyTimeToLive: FiniteDuration, routingLogic: RoutingLogic) extends TopicLike {
|
||||||
|
def business = {
|
||||||
|
case msg @ Subscribe(_, Some(group), _) ⇒
|
||||||
|
val encGroup = encName(group)
|
||||||
|
context.child(encGroup) match {
|
||||||
|
case Some(g) ⇒ g forward msg
|
||||||
|
case None ⇒
|
||||||
|
val g = context.actorOf(Props(classOf[Group], emptyTimeToLive, routingLogic), name = encGroup)
|
||||||
|
g forward msg
|
||||||
|
context watch g
|
||||||
|
context.parent ! RegisterTopic(g)
|
||||||
|
}
|
||||||
|
pruneDeadline = None
|
||||||
|
case msg @ Unsubscribe(_, Some(group), _) ⇒
|
||||||
|
context.child(encName(group)) match {
|
||||||
|
case Some(g) ⇒ g forward msg
|
||||||
|
case None ⇒ // no such group here
|
||||||
|
}
|
||||||
|
case msg: Subscribed ⇒
|
||||||
|
context.parent forward msg
|
||||||
|
case msg: Unsubscribed ⇒
|
||||||
|
context.parent forward msg
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class Group(val emptyTimeToLive: FiniteDuration, routingLogic: RoutingLogic) extends TopicLike {
|
||||||
|
def business = {
|
||||||
|
case SendToOneSubscriber(msg) ⇒
|
||||||
|
if (subscribers.nonEmpty)
|
||||||
|
Router(routingLogic, (subscribers map ActorRefRoutee).toVector).route(msg, sender())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -208,6 +280,16 @@ trait DistributedPubSubMessage extends Serializable
|
||||||
* true pub/sub mode. A typical usage of this mode is a chat room in an instant messaging
|
* true pub/sub mode. A typical usage of this mode is a chat room in an instant messaging
|
||||||
* application.
|
* application.
|
||||||
*
|
*
|
||||||
|
* 4. [[DistributedPubSubMediator.Publish]] with sendOneMessageToEachGroup -
|
||||||
|
* Actors may be subscribed to a named topic with an optional property `group`.
|
||||||
|
* If subscribing with a group name, each message published to a topic with the
|
||||||
|
* `sendOneMessageToEachGroup` flag is delivered via the supplied `routingLogic`
|
||||||
|
* (default random) to one actor within each subscribing group.
|
||||||
|
* If all the subscribed actors have the same group name, then this works just like
|
||||||
|
* [[DistributedPubSubMediator.Send]] and all messages are delivered to one subscribe.
|
||||||
|
* If all the subscribed actors have different group names, then this works like normal
|
||||||
|
* [[DistributedPubSubMediator.Publish]] and all messages are broadcast to all subscribers.
|
||||||
|
*
|
||||||
* You register actors to the local mediator with [[DistributedPubSubMediator.Put]] or
|
* You register actors to the local mediator with [[DistributedPubSubMediator.Put]] or
|
||||||
* [[DistributedPubSubMediator.Subscribe]]. `Put` is used together with `Send` and
|
* [[DistributedPubSubMediator.Subscribe]]. `Put` is used together with `Send` and
|
||||||
* `SendToAll` message delivery modes. The `ActorRef` in `Put` must belong to the same
|
* `SendToAll` message delivery modes. The `ActorRef` in `Put` must belong to the same
|
||||||
|
|
@ -245,7 +327,7 @@ class DistributedPubSubMediator(
|
||||||
val pruneInterval: FiniteDuration = removedTimeToLive / 2
|
val pruneInterval: FiniteDuration = removedTimeToLive / 2
|
||||||
val pruneTask = context.system.scheduler.schedule(pruneInterval, pruneInterval, self, Prune)
|
val pruneTask = context.system.scheduler.schedule(pruneInterval, pruneInterval, self, Prune)
|
||||||
|
|
||||||
var registry: Map[Address, Bucket] = Map.empty.withDefault(a ⇒ Bucket(a, 0L, Map.empty))
|
var registry: Map[Address, Bucket] = Map.empty.withDefault(a ⇒ Bucket(a, 0L, TreeMap.empty))
|
||||||
var nodes: Set[Address] = Set.empty
|
var nodes: Set[Address] = Set.empty
|
||||||
|
|
||||||
// the version is a timestamp because it is also used when pruning removed entries
|
// the version is a timestamp because it is also used when pruning removed entries
|
||||||
|
|
@ -293,8 +375,11 @@ class DistributedPubSubMediator(
|
||||||
case SendToAll(path, msg, skipSenderNode) ⇒
|
case SendToAll(path, msg, skipSenderNode) ⇒
|
||||||
publish(path, msg, skipSenderNode)
|
publish(path, msg, skipSenderNode)
|
||||||
|
|
||||||
case Publish(topic, msg) ⇒
|
case Publish(topic, msg, sendOneMessageToEachGroup) ⇒
|
||||||
publish(mkKey(self.path / URLEncoder.encode(topic, "utf-8")), msg)
|
if (sendOneMessageToEachGroup)
|
||||||
|
publishToEachGroup(mkKey(self.path / encName(topic)), msg)
|
||||||
|
else
|
||||||
|
publish(mkKey(self.path / encName(topic)), msg)
|
||||||
|
|
||||||
case Put(ref: ActorRef) ⇒
|
case Put(ref: ActorRef) ⇒
|
||||||
if (ref.path.address.hasGlobalScope)
|
if (ref.path.address.hasGlobalScope)
|
||||||
|
|
@ -312,24 +397,32 @@ class DistributedPubSubMediator(
|
||||||
case _ ⇒
|
case _ ⇒
|
||||||
}
|
}
|
||||||
|
|
||||||
case msg @ Subscribe(topic, _) ⇒
|
case msg @ Subscribe(topic, _, _) ⇒
|
||||||
// each topic is managed by a child actor with the same name as the topic
|
// each topic is managed by a child actor with the same name as the topic
|
||||||
val encTopic = URLEncoder.encode(topic, "utf-8")
|
val encTopic = encName(topic)
|
||||||
context.child(encTopic) match {
|
context.child(encTopic) match {
|
||||||
case Some(t) ⇒ t forward msg
|
case Some(t) ⇒ t forward msg
|
||||||
case None ⇒
|
case None ⇒
|
||||||
val t = context.actorOf(Props(classOf[Topic], removedTimeToLive), name = encTopic)
|
val t = context.actorOf(Props(classOf[Topic], removedTimeToLive, routingLogic), name = encTopic)
|
||||||
t forward msg
|
t forward msg
|
||||||
put(mkKey(t), Some(t))
|
registerTopic(t)
|
||||||
context.watch(t)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
case msg @ Unsubscribe(topic, _) ⇒
|
case msg @ RegisterTopic(t) ⇒
|
||||||
context.child(URLEncoder.encode(topic, "utf-8")) match {
|
registerTopic(t)
|
||||||
case Some(g) ⇒ g forward msg
|
|
||||||
|
case msg @ Subscribed(ack, ref) ⇒
|
||||||
|
ref ! ack
|
||||||
|
|
||||||
|
case msg @ Unsubscribe(topic, _, _) ⇒
|
||||||
|
context.child(encName(topic)) match {
|
||||||
|
case Some(t) ⇒ t forward msg
|
||||||
case None ⇒ // no such topic here
|
case None ⇒ // no such topic here
|
||||||
}
|
}
|
||||||
|
|
||||||
|
case msg @ Unsubscribed(ack, ref) ⇒
|
||||||
|
ref ! ack
|
||||||
|
|
||||||
case Status(otherVersions) ⇒
|
case Status(otherVersions) ⇒
|
||||||
// gossip chat starts with a Status message, containing the bucket versions of the other node
|
// gossip chat starts with a Status message, containing the bucket versions of the other node
|
||||||
val delta = collectDelta(otherVersions)
|
val delta = collectDelta(otherVersions)
|
||||||
|
|
@ -404,6 +497,25 @@ class DistributedPubSubMediator(
|
||||||
} ref forward msg
|
} ref forward msg
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def publishToEachGroup(path: String, msg: Any): Unit = {
|
||||||
|
val prefix = path + '/'
|
||||||
|
val lastKey = path + '0' // '0' is the next char of '/'
|
||||||
|
val groups = (for {
|
||||||
|
(_, bucket) ← registry.toSeq
|
||||||
|
key ← bucket.content.range(prefix, lastKey).keys
|
||||||
|
valueHolder ← bucket.content.get(key)
|
||||||
|
ref ← valueHolder.routee
|
||||||
|
} yield (key, ref)).groupBy(_._1).values
|
||||||
|
|
||||||
|
val wrappedMsg = SendToOneSubscriber(msg)
|
||||||
|
groups foreach {
|
||||||
|
group ⇒
|
||||||
|
val routees = group.map(_._2).toVector
|
||||||
|
if (routees.nonEmpty)
|
||||||
|
Router(routingLogic, routees).route(wrappedMsg, sender())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
def put(key: String, valueOption: Option[ActorRef]): Unit = {
|
def put(key: String, valueOption: Option[ActorRef]): Unit = {
|
||||||
val bucket = registry(selfAddress)
|
val bucket = registry(selfAddress)
|
||||||
val v = nextVersion()
|
val v = nextVersion()
|
||||||
|
|
@ -411,6 +523,11 @@ class DistributedPubSubMediator(
|
||||||
content = bucket.content + (key -> ValueHolder(v, valueOption))))
|
content = bucket.content + (key -> ValueHolder(v, valueOption))))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def registerTopic(ref: ActorRef): Unit = {
|
||||||
|
put(mkKey(ref), Some(ref))
|
||||||
|
context.watch(ref)
|
||||||
|
}
|
||||||
|
|
||||||
def mkKey(ref: ActorRef): String = mkKey(ref.path)
|
def mkKey(ref: ActorRef): String = mkKey(ref.path)
|
||||||
|
|
||||||
def mkKey(path: ActorPath): String = path.toStringWithoutAddress
|
def mkKey(path: ActorPath): String = path.toStringWithoutAddress
|
||||||
|
|
@ -434,7 +551,7 @@ class DistributedPubSubMediator(
|
||||||
// exceeded the maxDeltaElements, pick the elements with lowest versions
|
// exceeded the maxDeltaElements, pick the elements with lowest versions
|
||||||
val sortedContent = deltaContent.toVector.sortBy(_._2.version)
|
val sortedContent = deltaContent.toVector.sortBy(_._2.version)
|
||||||
val chunk = sortedContent.take(maxDeltaElements - (count - sortedContent.size))
|
val chunk = sortedContent.take(maxDeltaElements - (count - sortedContent.size))
|
||||||
bucket.copy(content = chunk.toMap, version = chunk.last._2.version)
|
bucket.copy(content = TreeMap.empty[String, ValueHolder] ++ chunk, version = chunk.last._2.version)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -26,6 +26,7 @@ import akka.contrib.pattern.DistributedPubSubMediator.Internal._
|
||||||
import akka.serialization.Serialization
|
import akka.serialization.Serialization
|
||||||
import akka.actor.ActorRef
|
import akka.actor.ActorRef
|
||||||
import akka.serialization.SerializationExtension
|
import akka.serialization.SerializationExtension
|
||||||
|
import scala.collection.immutable.TreeMap
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Protobuf serializer of DistributedPubSubMediator messages.
|
* Protobuf serializer of DistributedPubSubMediator messages.
|
||||||
|
|
@ -137,7 +138,7 @@ class DistributedPubSubMessageSerializer(val system: ExtendedActorSystem) extend
|
||||||
|
|
||||||
private def deltaFromProto(delta: dm.Delta): Delta =
|
private def deltaFromProto(delta: dm.Delta): Delta =
|
||||||
Delta(delta.getBucketsList.asScala.toVector.map { b ⇒
|
Delta(delta.getBucketsList.asScala.toVector.map { b ⇒
|
||||||
val content: Map[String, ValueHolder] = b.getContentList.asScala.map { entry ⇒
|
val content: TreeMap[String, ValueHolder] = b.getContentList.asScala.map { entry ⇒
|
||||||
entry.getKey -> ValueHolder(entry.getVersion, if (entry.hasRef) Some(resolveActorRef(entry.getRef)) else None)
|
entry.getKey -> ValueHolder(entry.getVersion, if (entry.hasRef) Some(resolveActorRef(entry.getRef)) else None)
|
||||||
}(breakOut)
|
}(breakOut)
|
||||||
Bucket(addressFromProto(b.getOwner), b.getVersion, content)
|
Bucket(addressFromProto(b.getOwner), b.getVersion, content)
|
||||||
|
|
|
||||||
|
|
@ -39,6 +39,9 @@ object DistributedPubSubMediatorSpec extends MultiNodeConfig {
|
||||||
final case class Talk(path: String, msg: Any)
|
final case class Talk(path: String, msg: Any)
|
||||||
final case class TalkToOthers(path: String, msg: Any)
|
final case class TalkToOthers(path: String, msg: Any)
|
||||||
final case class Shout(topic: String, msg: Any)
|
final case class Shout(topic: String, msg: Any)
|
||||||
|
final case class ShoutToGroups(topic: String, msg: Any)
|
||||||
|
final case class JoinGroup(topic: String, group: String)
|
||||||
|
final case class ExitGroup(topic: String, group: String)
|
||||||
}
|
}
|
||||||
|
|
||||||
class TestChatUser(mediator: ActorRef, testActor: ActorRef) extends Actor {
|
class TestChatUser(mediator: ActorRef, testActor: ActorRef) extends Actor {
|
||||||
|
|
@ -46,11 +49,14 @@ object DistributedPubSubMediatorSpec extends MultiNodeConfig {
|
||||||
import DistributedPubSubMediator._
|
import DistributedPubSubMediator._
|
||||||
|
|
||||||
def receive = {
|
def receive = {
|
||||||
case Whisper(path, msg) ⇒ mediator ! Send(path, msg, localAffinity = true)
|
case Whisper(path, msg) ⇒ mediator ! Send(path, msg, localAffinity = true)
|
||||||
case Talk(path, msg) ⇒ mediator ! SendToAll(path, msg)
|
case Talk(path, msg) ⇒ mediator ! SendToAll(path, msg)
|
||||||
case TalkToOthers(path, msg) ⇒ mediator ! SendToAll(path, msg, allButSelf = true)
|
case TalkToOthers(path, msg) ⇒ mediator ! SendToAll(path, msg, allButSelf = true)
|
||||||
case Shout(topic, msg) ⇒ mediator ! Publish(topic, msg)
|
case Shout(topic, msg) ⇒ mediator ! Publish(topic, msg)
|
||||||
case msg ⇒ testActor ! msg
|
case ShoutToGroups(topic, msg) ⇒ mediator ! Publish(topic, msg, true)
|
||||||
|
case JoinGroup(topic, group) ⇒ mediator ! Subscribe(topic, Some(group), self)
|
||||||
|
case ExitGroup(topic, group) ⇒ mediator ! Unsubscribe(topic, Some(group), self)
|
||||||
|
case msg ⇒ testActor ! msg
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -76,7 +82,7 @@ object DistributedPubSubMediatorSpec extends MultiNodeConfig {
|
||||||
mediator ! Subscribe("content", self)
|
mediator ! Subscribe("content", self)
|
||||||
|
|
||||||
def receive = {
|
def receive = {
|
||||||
case SubscribeAck(Subscribe("content", `self`)) ⇒
|
case SubscribeAck(Subscribe("content", None, `self`)) ⇒
|
||||||
context become ready
|
context become ready
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -345,6 +351,52 @@ class DistributedPubSubMediatorSpec extends MultiNodeSpec(DistributedPubSubMedia
|
||||||
enterBarrier("after-11")
|
enterBarrier("after-11")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"send one message to each group" in within(20 seconds) {
|
||||||
|
runOn(first) {
|
||||||
|
val u12 = createChatUser("u12")
|
||||||
|
u12 ! JoinGroup("topic2", "group1")
|
||||||
|
expectMsg(SubscribeAck(Subscribe("topic2", Some("group1"), u12)))
|
||||||
|
}
|
||||||
|
runOn(second) {
|
||||||
|
val u12 = createChatUser("u12")
|
||||||
|
u12 ! JoinGroup("topic2", "group2")
|
||||||
|
expectMsg(SubscribeAck(Subscribe("topic2", Some("group2"), u12)))
|
||||||
|
|
||||||
|
val u13 = createChatUser("u13")
|
||||||
|
u13 ! JoinGroup("topic2", "group2")
|
||||||
|
expectMsg(SubscribeAck(Subscribe("topic2", Some("group2"), u13)))
|
||||||
|
}
|
||||||
|
awaitCount(17)
|
||||||
|
enterBarrier("12-registered")
|
||||||
|
|
||||||
|
runOn(first) {
|
||||||
|
chatUser("u12") ! ShoutToGroups("topic2", "hi")
|
||||||
|
}
|
||||||
|
|
||||||
|
runOn(first, second) {
|
||||||
|
expectMsg("hi")
|
||||||
|
expectNoMsg(2.seconds) // each group receive only one message
|
||||||
|
}
|
||||||
|
enterBarrier("12-published")
|
||||||
|
|
||||||
|
runOn(first) {
|
||||||
|
val u12 = chatUser("u12")
|
||||||
|
u12 ! ExitGroup("topic2", "group1")
|
||||||
|
expectMsg(UnsubscribeAck(Unsubscribe("topic2", Some("group1"), u12)))
|
||||||
|
}
|
||||||
|
|
||||||
|
runOn(second) {
|
||||||
|
val u12 = chatUser("u12")
|
||||||
|
u12 ! ExitGroup("topic2", "group2")
|
||||||
|
expectMsg(UnsubscribeAck(Unsubscribe("topic2", Some("group2"), u12)))
|
||||||
|
val u13 = chatUser("u13")
|
||||||
|
u13 ! ExitGroup("topic2", "group2")
|
||||||
|
expectMsg(UnsubscribeAck(Unsubscribe("topic2", Some("group2"), u13)))
|
||||||
|
}
|
||||||
|
enterBarrier("after-12")
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
"transfer delta correctly" in {
|
"transfer delta correctly" in {
|
||||||
val firstAddress = node(first).address
|
val firstAddress = node(first).address
|
||||||
val secondAddress = node(second).address
|
val secondAddress = node(second).address
|
||||||
|
|
@ -354,8 +406,8 @@ class DistributedPubSubMediatorSpec extends MultiNodeSpec(DistributedPubSubMedia
|
||||||
mediator ! Status(versions = Map.empty)
|
mediator ! Status(versions = Map.empty)
|
||||||
val deltaBuckets = expectMsgType[Delta].buckets
|
val deltaBuckets = expectMsgType[Delta].buckets
|
||||||
deltaBuckets.size should be(3)
|
deltaBuckets.size should be(3)
|
||||||
deltaBuckets.find(_.owner == firstAddress).get.content.size should be(7)
|
deltaBuckets.find(_.owner == firstAddress).get.content.size should be(9)
|
||||||
deltaBuckets.find(_.owner == secondAddress).get.content.size should be(6)
|
deltaBuckets.find(_.owner == secondAddress).get.content.size should be(8)
|
||||||
deltaBuckets.find(_.owner == thirdAddress).get.content.size should be(2)
|
deltaBuckets.find(_.owner == thirdAddress).get.content.size should be(2)
|
||||||
}
|
}
|
||||||
enterBarrier("verified-initial-delta")
|
enterBarrier("verified-initial-delta")
|
||||||
|
|
@ -377,15 +429,15 @@ class DistributedPubSubMediatorSpec extends MultiNodeSpec(DistributedPubSubMedia
|
||||||
mediator ! Status(versions = deltaBuckets2.map(b ⇒ b.owner -> b.version).toMap)
|
mediator ! Status(versions = deltaBuckets2.map(b ⇒ b.owner -> b.version).toMap)
|
||||||
val deltaBuckets3 = expectMsgType[Delta].buckets
|
val deltaBuckets3 = expectMsgType[Delta].buckets
|
||||||
|
|
||||||
deltaBuckets3.map(_.content.size).sum should be(7 + 6 + 2 + many - 500 - 500)
|
deltaBuckets3.map(_.content.size).sum should be(9 + 8 + 2 + many - 500 - 500)
|
||||||
}
|
}
|
||||||
|
|
||||||
enterBarrier("verified-delta-with-many")
|
enterBarrier("verified-delta-with-many")
|
||||||
within(10.seconds) {
|
within(10.seconds) {
|
||||||
awaitCount(13 + many)
|
awaitCount(17 + many)
|
||||||
}
|
}
|
||||||
|
|
||||||
enterBarrier("after-12")
|
enterBarrier("after-13")
|
||||||
}
|
}
|
||||||
|
|
||||||
"remove entries when node is removed" in within(30 seconds) {
|
"remove entries when node is removed" in within(30 seconds) {
|
||||||
|
|
@ -403,7 +455,7 @@ class DistributedPubSubMediatorSpec extends MultiNodeSpec(DistributedPubSubMedia
|
||||||
awaitCount(countBefore - 2)
|
awaitCount(countBefore - 2)
|
||||||
}
|
}
|
||||||
|
|
||||||
enterBarrier("after-13")
|
enterBarrier("after-14")
|
||||||
}
|
}
|
||||||
|
|
||||||
"receive proper unsubscribeAck message" in within(15 seconds) {
|
"receive proper unsubscribeAck message" in within(15 seconds) {
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@ import akka.testkit.AkkaSpec
|
||||||
import akka.contrib.pattern.DistributedPubSubMediator._
|
import akka.contrib.pattern.DistributedPubSubMediator._
|
||||||
import akka.contrib.pattern.DistributedPubSubMediator.Internal._
|
import akka.contrib.pattern.DistributedPubSubMediator.Internal._
|
||||||
import akka.actor.Props
|
import akka.actor.Props
|
||||||
|
import scala.collection.immutable.TreeMap
|
||||||
|
|
||||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||||
class DistributedPubSubMessageSerializerSpec extends AkkaSpec {
|
class DistributedPubSubMessageSerializerSpec extends AkkaSpec {
|
||||||
|
|
@ -32,9 +33,9 @@ class DistributedPubSubMessageSerializerSpec extends AkkaSpec {
|
||||||
val u4 = system.actorOf(Props.empty, "u4")
|
val u4 = system.actorOf(Props.empty, "u4")
|
||||||
checkSerialization(Status(Map(address1 -> 3, address2 -> 17, address3 -> 5)))
|
checkSerialization(Status(Map(address1 -> 3, address2 -> 17, address3 -> 5)))
|
||||||
checkSerialization(Delta(List(
|
checkSerialization(Delta(List(
|
||||||
Bucket(address1, 3, Map("/user/u1" -> ValueHolder(2, Some(u1)), "/user/u2" -> ValueHolder(3, Some(u2)))),
|
Bucket(address1, 3, TreeMap("/user/u1" -> ValueHolder(2, Some(u1)), "/user/u2" -> ValueHolder(3, Some(u2)))),
|
||||||
Bucket(address2, 17, Map("/user/u3" -> ValueHolder(17, Some(u3)))),
|
Bucket(address2, 17, TreeMap("/user/u3" -> ValueHolder(17, Some(u3)))),
|
||||||
Bucket(address3, 5, Map("/user/u4" -> ValueHolder(4, Some(u4)), "/user/u5" -> ValueHolder(5, None))))))
|
Bucket(address3, 5, TreeMap("/user/u4" -> ValueHolder(4, Some(u4)), "/user/u5" -> ValueHolder(5, None))))))
|
||||||
checkSerialization(Send("/user/u3", "hello", localAffinity = true))
|
checkSerialization(Send("/user/u3", "hello", localAffinity = true))
|
||||||
checkSerialization(SendToAll("/user/u3", "hello", allButSelf = true))
|
checkSerialization(SendToAll("/user/u3", "hello", allButSelf = true))
|
||||||
checkSerialization(Publish("mytopic", "hello"))
|
checkSerialization(Publish("mytopic", "hello"))
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue