2013-04-06 16:22:30 +02:00
|
|
|
/**
|
2014-02-02 19:05:45 -06:00
|
|
|
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
2013-04-06 16:22:30 +02:00
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
package akka.contrib.pattern
|
|
|
|
|
|
|
|
|
|
import scala.collection.immutable
|
|
|
|
|
import scala.concurrent.duration._
|
|
|
|
|
import scala.concurrent.forkjoin.ThreadLocalRandom
|
|
|
|
|
import java.net.URLEncoder
|
|
|
|
|
import akka.actor.Actor
|
|
|
|
|
import akka.actor.ActorLogging
|
|
|
|
|
import akka.actor.ActorPath
|
|
|
|
|
import akka.actor.ActorRef
|
|
|
|
|
import akka.actor.ActorSystem
|
|
|
|
|
import akka.actor.Address
|
|
|
|
|
import akka.actor.ExtendedActorSystem
|
|
|
|
|
import akka.actor.Extension
|
|
|
|
|
import akka.actor.ExtensionId
|
|
|
|
|
import akka.actor.ExtensionIdProvider
|
|
|
|
|
import akka.actor.Props
|
|
|
|
|
import akka.actor.Terminated
|
|
|
|
|
import akka.cluster.Cluster
|
|
|
|
|
import akka.cluster.ClusterEvent._
|
|
|
|
|
import akka.cluster.Member
|
|
|
|
|
import akka.cluster.MemberStatus
|
2013-10-16 13:19:36 +02:00
|
|
|
import akka.routing.RandomRoutingLogic
|
|
|
|
|
import akka.routing.RoutingLogic
|
|
|
|
|
import akka.routing.Routee
|
|
|
|
|
import akka.routing.ActorRefRoutee
|
|
|
|
|
import akka.routing.Router
|
|
|
|
|
import akka.routing.RoundRobinRoutingLogic
|
|
|
|
|
import akka.routing.ConsistentHashingRoutingLogic
|
|
|
|
|
import akka.routing.BroadcastRoutingLogic
|
2013-04-06 16:22:30 +02:00
|
|
|
|
|
|
|
|
object DistributedPubSubMediator {
|
2013-04-17 22:14:19 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Scala API: Factory method for `DistributedPubSubMediator` [[akka.actor.Props]].
|
|
|
|
|
*/
|
|
|
|
|
def props(
|
|
|
|
|
role: Option[String],
|
2013-10-16 13:19:36 +02:00
|
|
|
routingLogic: RoutingLogic = RandomRoutingLogic(),
|
2013-04-17 22:14:19 +02:00
|
|
|
gossipInterval: FiniteDuration = 1.second,
|
2013-10-21 12:00:38 +02:00
|
|
|
removedTimeToLive: FiniteDuration = 2.minutes,
|
2013-10-21 18:37:01 +02:00
|
|
|
maxDeltaElements: Int = 3000): Props =
|
2013-10-21 12:00:38 +02:00
|
|
|
Props(classOf[DistributedPubSubMediator], role, routingLogic, gossipInterval, removedTimeToLive, maxDeltaElements)
|
2013-04-17 22:14:19 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Java API: Factory method for `DistributedPubSubMediator` [[akka.actor.Props]].
|
|
|
|
|
*/
|
|
|
|
|
def props(
|
|
|
|
|
role: String,
|
2013-10-16 13:19:36 +02:00
|
|
|
routingLogic: RoutingLogic,
|
2013-04-17 22:14:19 +02:00
|
|
|
gossipInterval: FiniteDuration,
|
2013-10-21 12:00:38 +02:00
|
|
|
removedTimeToLive: FiniteDuration,
|
|
|
|
|
maxDeltaElements: Int): Props =
|
|
|
|
|
props(Internal.roleOption(role), routingLogic, gossipInterval, removedTimeToLive, maxDeltaElements)
|
2013-04-17 22:14:19 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Java API: Factory method for `DistributedPubSubMediator` [[akka.actor.Props]]
|
|
|
|
|
* with default values.
|
|
|
|
|
*/
|
|
|
|
|
def defaultProps(role: String): Props = props(Internal.roleOption(role))
|
|
|
|
|
|
2014-03-07 13:20:01 +01:00
|
|
|
@SerialVersionUID(1L) final case class Put(ref: ActorRef)
|
|
|
|
|
@SerialVersionUID(1L) final case class Remove(path: String)
|
|
|
|
|
@SerialVersionUID(1L) final case class Subscribe(topic: String, ref: ActorRef)
|
|
|
|
|
@SerialVersionUID(1L) final case class Unsubscribe(topic: String, ref: ActorRef)
|
|
|
|
|
@SerialVersionUID(1L) final case class SubscribeAck(subscribe: Subscribe)
|
|
|
|
|
@SerialVersionUID(1L) final case class UnsubscribeAck(unsubscribe: Unsubscribe)
|
|
|
|
|
@SerialVersionUID(1L) final case class Publish(topic: String, msg: Any) extends DistributedPubSubMessage
|
|
|
|
|
@SerialVersionUID(1L) final case class Send(path: String, msg: Any, localAffinity: Boolean) extends DistributedPubSubMessage {
|
2013-10-16 13:19:36 +02:00
|
|
|
/**
|
|
|
|
|
* Convenience constructor with `localAffinity` false
|
|
|
|
|
*/
|
|
|
|
|
def this(path: String, msg: Any) = this(path, msg, localAffinity = false)
|
|
|
|
|
}
|
2014-03-07 13:20:01 +01:00
|
|
|
@SerialVersionUID(1L) final case class SendToAll(path: String, msg: Any, allButSelf: Boolean = false) extends DistributedPubSubMessage {
|
2013-05-21 09:05:40 +02:00
|
|
|
def this(path: String, msg: Any) = this(path, msg, allButSelf = false)
|
|
|
|
|
}
|
2013-04-06 16:22:30 +02:00
|
|
|
|
|
|
|
|
// Only for testing purposes, to poll/await replication
|
|
|
|
|
case object Count
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
private[pattern] object Internal {
|
|
|
|
|
case object Prune
|
|
|
|
|
|
|
|
|
|
@SerialVersionUID(1L)
|
2014-03-07 13:20:01 +01:00
|
|
|
final case class Bucket(
|
2013-04-06 16:22:30 +02:00
|
|
|
owner: Address,
|
|
|
|
|
version: Long,
|
|
|
|
|
content: Map[String, ValueHolder])
|
|
|
|
|
|
|
|
|
|
@SerialVersionUID(1L)
|
2014-03-07 13:20:01 +01:00
|
|
|
final case class ValueHolder(version: Long, ref: Option[ActorRef]) {
|
2013-10-16 13:19:36 +02:00
|
|
|
@transient lazy val routee: Option[Routee] = ref map ActorRefRoutee
|
|
|
|
|
}
|
2013-04-06 16:22:30 +02:00
|
|
|
|
|
|
|
|
@SerialVersionUID(1L)
|
2014-03-07 13:20:01 +01:00
|
|
|
final case class Status(versions: Map[Address, Long]) extends DistributedPubSubMessage
|
2013-04-06 16:22:30 +02:00
|
|
|
@SerialVersionUID(1L)
|
2014-03-07 13:20:01 +01:00
|
|
|
final case class Delta(buckets: immutable.Iterable[Bucket]) extends DistributedPubSubMessage
|
2013-04-06 16:22:30 +02:00
|
|
|
|
|
|
|
|
case object GossipTick
|
|
|
|
|
|
|
|
|
|
def roleOption(role: String): Option[String] = role match {
|
|
|
|
|
case null | "" ⇒ None
|
|
|
|
|
case _ ⇒ Some(role)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
class Topic(emptyTimeToLive: FiniteDuration) extends Actor {
|
|
|
|
|
import context.dispatcher
|
|
|
|
|
val pruneInterval: FiniteDuration = emptyTimeToLive / 2
|
|
|
|
|
val pruneTask = context.system.scheduler.schedule(pruneInterval, pruneInterval, self, Prune)
|
|
|
|
|
var pruneDeadline: Option[Deadline] = None
|
|
|
|
|
|
|
|
|
|
var subscribers = Set.empty[ActorRef]
|
|
|
|
|
|
|
|
|
|
override def postStop(): Unit = {
|
|
|
|
|
super.postStop()
|
|
|
|
|
pruneTask.cancel()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def receive = {
|
|
|
|
|
case msg @ Subscribe(_, ref) ⇒
|
|
|
|
|
context watch ref
|
|
|
|
|
subscribers += ref
|
|
|
|
|
pruneDeadline = None
|
2014-01-16 15:16:35 +01:00
|
|
|
sender().tell(SubscribeAck(msg), context.parent)
|
2013-04-06 16:22:30 +02:00
|
|
|
case msg @ Unsubscribe(_, ref) ⇒
|
|
|
|
|
context unwatch ref
|
|
|
|
|
remove(ref)
|
2014-01-16 15:16:35 +01:00
|
|
|
sender().tell(UnsubscribeAck(msg), context.parent)
|
2013-04-06 16:22:30 +02:00
|
|
|
case Terminated(ref) ⇒
|
|
|
|
|
remove(ref)
|
|
|
|
|
case Prune ⇒
|
|
|
|
|
for (d ← pruneDeadline if d.isOverdue) context stop self
|
|
|
|
|
case msg ⇒
|
|
|
|
|
subscribers foreach { _ forward msg }
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def remove(ref: ActorRef): Unit =
|
|
|
|
|
if (subscribers.contains(ref)) {
|
|
|
|
|
subscribers -= ref
|
|
|
|
|
if (subscribers.isEmpty)
|
|
|
|
|
pruneDeadline = Some(Deadline.now + emptyTimeToLive)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2013-10-21 18:37:01 +02:00
|
|
|
/**
|
|
|
|
|
* Marker trait for remote messages with special serializer.
|
|
|
|
|
*/
|
|
|
|
|
trait DistributedPubSubMessage extends Serializable
|
|
|
|
|
|
2013-04-06 16:22:30 +02:00
|
|
|
/**
|
|
|
|
|
* This actor manages a registry of actor references and replicates
|
|
|
|
|
* the entries to peer actors among all cluster nodes or a group of nodes
|
|
|
|
|
* tagged with a specific role.
|
|
|
|
|
*
|
|
|
|
|
* The `DistributedPubSubMediator` is supposed to be started on all nodes,
|
|
|
|
|
* or all nodes with specified role, in the cluster. The mediator can be
|
|
|
|
|
* started with the [[DistributedPubSubExtension]] or as an ordinary actor.
|
|
|
|
|
*
|
|
|
|
|
* Changes are only performed in the own part of the registry and those changes
|
|
|
|
|
* are versioned. Deltas are disseminated in a scalable way to other nodes with
|
|
|
|
|
* a gossip protocol. The registry is eventually consistent, i.e. changes are not
|
|
|
|
|
* immediately visible at other nodes, but typically they will be fully replicated
|
|
|
|
|
* to all other nodes after a few seconds.
|
|
|
|
|
*
|
|
|
|
|
* You can send messages via the mediator on any node to registered actors on
|
|
|
|
|
* any other node. There is three modes of message delivery.
|
|
|
|
|
*
|
|
|
|
|
* 1. [[DistributedPubSubMediator.Send]] -
|
|
|
|
|
* The message will be delivered to one recipient with a matching path, if any such
|
2013-10-16 13:19:36 +02:00
|
|
|
* exists in the registry. If several entries match the path the message will be sent
|
|
|
|
|
* via the supplied `routingLogic` (default random) to one destination. The sender of the
|
|
|
|
|
* message can specify that local affinity is preferred, i.e. the message is sent to an actor
|
|
|
|
|
* in the same local actor system as the used mediator actor, if any such exists, otherwise
|
|
|
|
|
* route to any other matching entry. A typical usage of this mode is private chat to one
|
|
|
|
|
* other user in an instant messaging application. It can also be used for distributing
|
|
|
|
|
* tasks to registered workers, like a cluster aware router where the routees dynamically
|
|
|
|
|
* can register themselves.
|
2013-04-06 16:22:30 +02:00
|
|
|
*
|
|
|
|
|
* 2. [[DistributedPubSubMediator.SendToAll]] -
|
|
|
|
|
* The message will be delivered to all recipients with a matching path. Actors with
|
|
|
|
|
* the same path, without address information, can be registered on different nodes.
|
|
|
|
|
* On each node there can only be one such actor, since the path is unique within one
|
|
|
|
|
* local actor system. Typical usage of this mode is to broadcast messages to all replicas
|
|
|
|
|
* with the same path, e.g. 3 actors on different nodes that all perform the same actions,
|
|
|
|
|
* for redundancy.
|
|
|
|
|
*
|
|
|
|
|
* 3. [[DistributedPubSubMediator.Publish]] -
|
|
|
|
|
* Actors may be registered to a named topic instead of path. This enables many subscribers
|
|
|
|
|
* on each node. The message will be delivered to all subscribers of the topic. For
|
|
|
|
|
* efficiency the message is sent over the wire only once per node (that has a matching topic),
|
|
|
|
|
* and then delivered to all subscribers of the local topic representation. This is the
|
|
|
|
|
* true pub/sub mode. A typical usage of this mode is a chat room in an instant messaging
|
|
|
|
|
* application.
|
|
|
|
|
*
|
|
|
|
|
* You register actors to the local mediator with [[DistributedPubSubMediator.Put]] or
|
|
|
|
|
* [[DistributedPubSubMediator.Subscribe]]. `Put` is used together with `Send` and
|
|
|
|
|
* `SendToAll` message delivery modes. The `ActorRef` in `Put` must belong to the same
|
|
|
|
|
* local actor system as the mediator. `Subscribe` is used together with `Publish`.
|
|
|
|
|
* Actors are automatically removed from the registry when they are terminated, or you
|
|
|
|
|
* can explicitly remove entries with [[DistributedPubSubMediator.Remove]] or
|
|
|
|
|
* [[DistributedPubSubMediator.Unsubscribe]].
|
|
|
|
|
*
|
|
|
|
|
* Successful `Subscribe` and `Unsubscribe` is acknowledged with
|
|
|
|
|
* [[DistributedPubSubMediator.SubscribeAck]] and [[DistributedPubSubMediator.UnsubscribeAck]]
|
|
|
|
|
* replies.
|
|
|
|
|
*/
|
|
|
|
|
class DistributedPubSubMediator(
|
|
|
|
|
role: Option[String],
|
2013-10-16 13:19:36 +02:00
|
|
|
routingLogic: RoutingLogic,
|
2013-04-17 22:14:19 +02:00
|
|
|
gossipInterval: FiniteDuration,
|
2013-10-21 12:00:38 +02:00
|
|
|
removedTimeToLive: FiniteDuration,
|
|
|
|
|
maxDeltaElements: Int)
|
2013-04-06 16:22:30 +02:00
|
|
|
extends Actor with ActorLogging {
|
|
|
|
|
|
|
|
|
|
import DistributedPubSubMediator._
|
|
|
|
|
import DistributedPubSubMediator.Internal._
|
|
|
|
|
|
|
|
|
|
val cluster = Cluster(context.system)
|
|
|
|
|
import cluster.selfAddress
|
|
|
|
|
|
|
|
|
|
require(role.forall(cluster.selfRoles.contains),
|
|
|
|
|
s"This cluster member [${selfAddress}] doesn't have the role [$role]")
|
|
|
|
|
|
|
|
|
|
val removedTimeToLiveMillis = removedTimeToLive.toMillis
|
|
|
|
|
|
|
|
|
|
//Start periodic gossip to random nodes in cluster
|
|
|
|
|
import context.dispatcher
|
|
|
|
|
val gossipTask = context.system.scheduler.schedule(gossipInterval, gossipInterval, self, GossipTick)
|
|
|
|
|
val pruneInterval: FiniteDuration = removedTimeToLive / 2
|
|
|
|
|
val pruneTask = context.system.scheduler.schedule(pruneInterval, pruneInterval, self, Prune)
|
|
|
|
|
|
|
|
|
|
var registry: Map[Address, Bucket] = Map.empty.withDefault(a ⇒ Bucket(a, 0L, Map.empty))
|
|
|
|
|
var nodes: Set[Address] = Set.empty
|
|
|
|
|
|
|
|
|
|
// the version is a timestamp because it is also used when pruning removed entries
|
|
|
|
|
val nextVersion = {
|
|
|
|
|
var version = 0L
|
|
|
|
|
() ⇒ {
|
|
|
|
|
val current = System.currentTimeMillis
|
|
|
|
|
version = if (current > version) current else version + 1
|
|
|
|
|
version
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def preStart(): Unit = {
|
|
|
|
|
super.preStart()
|
|
|
|
|
require(!cluster.isTerminated, "Cluster node must not be terminated")
|
|
|
|
|
cluster.subscribe(self, classOf[MemberEvent])
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def postStop(): Unit = {
|
|
|
|
|
super.postStop()
|
|
|
|
|
cluster unsubscribe self
|
|
|
|
|
gossipTask.cancel()
|
|
|
|
|
pruneTask.cancel()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def matchingRole(m: Member): Boolean = role.forall(m.hasRole)
|
|
|
|
|
|
|
|
|
|
def receive = {
|
|
|
|
|
|
|
|
|
|
case Send(path, msg, localAffinity) ⇒
|
|
|
|
|
registry(selfAddress).content.get(path) match {
|
|
|
|
|
case Some(ValueHolder(_, Some(ref))) if localAffinity ⇒
|
|
|
|
|
ref forward msg
|
|
|
|
|
case _ ⇒
|
2013-10-16 13:19:36 +02:00
|
|
|
val routees = (for {
|
2013-04-06 16:22:30 +02:00
|
|
|
(_, bucket) ← registry
|
|
|
|
|
valueHolder ← bucket.content.get(path)
|
2013-10-16 13:19:36 +02:00
|
|
|
routee ← valueHolder.routee
|
|
|
|
|
} yield routee).toVector
|
2013-04-06 16:22:30 +02:00
|
|
|
|
2013-10-16 13:19:36 +02:00
|
|
|
if (routees.nonEmpty)
|
2014-01-16 15:16:35 +01:00
|
|
|
Router(routingLogic, routees).route(msg, sender())
|
2013-04-06 16:22:30 +02:00
|
|
|
}
|
|
|
|
|
|
2013-05-20 13:45:13 +02:00
|
|
|
case SendToAll(path, msg, skipSenderNode) ⇒
|
|
|
|
|
publish(path, msg, skipSenderNode)
|
2013-04-06 16:22:30 +02:00
|
|
|
|
|
|
|
|
case Publish(topic, msg) ⇒
|
|
|
|
|
publish(mkKey(self.path / URLEncoder.encode(topic, "utf-8")), msg)
|
|
|
|
|
|
|
|
|
|
case Put(ref: ActorRef) ⇒
|
|
|
|
|
if (ref.path.address.hasGlobalScope)
|
|
|
|
|
log.warning("Registered actor must be local: [{}]", ref)
|
|
|
|
|
else {
|
|
|
|
|
put(mkKey(ref), Some(ref))
|
|
|
|
|
context.watch(ref)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
case Remove(key) ⇒
|
|
|
|
|
registry(selfAddress).content.get(key) match {
|
|
|
|
|
case Some(ValueHolder(_, Some(ref))) ⇒
|
|
|
|
|
context.unwatch(ref)
|
|
|
|
|
put(key, None)
|
|
|
|
|
case _ ⇒
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
case msg @ Subscribe(topic, _) ⇒
|
|
|
|
|
// each topic is managed by a child actor with the same name as the topic
|
|
|
|
|
val encTopic = URLEncoder.encode(topic, "utf-8")
|
|
|
|
|
context.child(encTopic) match {
|
|
|
|
|
case Some(t) ⇒ t forward msg
|
|
|
|
|
case None ⇒
|
2013-04-17 22:14:19 +02:00
|
|
|
val t = context.actorOf(Props(classOf[Topic], removedTimeToLive), name = encTopic)
|
2013-04-06 16:22:30 +02:00
|
|
|
t forward msg
|
|
|
|
|
put(mkKey(t), Some(t))
|
|
|
|
|
context.watch(t)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
case msg @ Unsubscribe(topic, _) ⇒
|
|
|
|
|
context.child(URLEncoder.encode(topic, "utf-8")) match {
|
|
|
|
|
case Some(g) ⇒ g ! msg
|
|
|
|
|
case None ⇒ // no such topic here
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
case Status(otherVersions) ⇒
|
|
|
|
|
// gossip chat starts with a Status message, containing the bucket versions of the other node
|
|
|
|
|
val delta = collectDelta(otherVersions)
|
|
|
|
|
if (delta.nonEmpty)
|
2014-01-16 15:16:35 +01:00
|
|
|
sender() ! Delta(delta)
|
2013-04-06 16:22:30 +02:00
|
|
|
if (otherHasNewerVersions(otherVersions))
|
2014-01-16 15:16:35 +01:00
|
|
|
sender() ! Status(versions = myVersions) // it will reply with Delta
|
2013-04-06 16:22:30 +02:00
|
|
|
|
|
|
|
|
case Delta(buckets) ⇒
|
|
|
|
|
// reply from Status message in the gossip chat
|
|
|
|
|
// the Delta contains potential updates (newer versions) from the other node
|
2014-02-07 09:23:23 +01:00
|
|
|
// only accept deltas/buckets from known nodes, otherwise there is a risk of
|
|
|
|
|
// adding back entries when nodes are removed
|
|
|
|
|
if (nodes(sender().path.address)) {
|
2013-04-06 16:22:30 +02:00
|
|
|
buckets foreach { b ⇒
|
2014-02-07 09:23:23 +01:00
|
|
|
if (nodes(b.owner)) {
|
|
|
|
|
val myBucket = registry(b.owner)
|
|
|
|
|
if (b.version > myBucket.version) {
|
|
|
|
|
registry += (b.owner -> myBucket.copy(version = b.version, content = myBucket.content ++ b.content))
|
|
|
|
|
}
|
2013-04-06 16:22:30 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
case GossipTick ⇒ gossip()
|
|
|
|
|
|
|
|
|
|
case Prune ⇒ prune()
|
|
|
|
|
|
|
|
|
|
case Terminated(a) ⇒
|
|
|
|
|
val key = mkKey(a)
|
|
|
|
|
registry(selfAddress).content.get(key) match {
|
|
|
|
|
case Some(ValueHolder(_, Some(`a`))) ⇒
|
|
|
|
|
// remove
|
|
|
|
|
put(key, None)
|
|
|
|
|
case _ ⇒
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
case state: CurrentClusterState ⇒
|
|
|
|
|
nodes = state.members.collect {
|
|
|
|
|
case m if m.status != MemberStatus.Joining && matchingRole(m) ⇒ m.address
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
case MemberUp(m) ⇒
|
|
|
|
|
if (matchingRole(m))
|
|
|
|
|
nodes += m.address
|
|
|
|
|
|
2013-05-23 11:09:32 +02:00
|
|
|
case MemberRemoved(m, _) ⇒
|
2013-04-06 16:22:30 +02:00
|
|
|
if (m.address == selfAddress)
|
|
|
|
|
context stop self
|
|
|
|
|
else if (matchingRole(m)) {
|
|
|
|
|
nodes -= m.address
|
|
|
|
|
registry -= m.address
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
case _: MemberEvent ⇒ // not of interest
|
|
|
|
|
|
|
|
|
|
case Count ⇒
|
|
|
|
|
val count = registry.map {
|
|
|
|
|
case (owner, bucket) ⇒ bucket.content.count {
|
|
|
|
|
case (_, valueHolder) ⇒ valueHolder.ref.isDefined
|
|
|
|
|
}
|
|
|
|
|
}.sum
|
2014-01-16 15:16:35 +01:00
|
|
|
sender() ! count
|
2013-04-06 16:22:30 +02:00
|
|
|
}
|
|
|
|
|
|
2013-05-21 09:05:40 +02:00
|
|
|
def publish(path: String, msg: Any, allButSelf: Boolean = false): Unit = {
|
2013-04-06 16:22:30 +02:00
|
|
|
for {
|
2013-05-20 13:45:13 +02:00
|
|
|
(address, bucket) ← registry
|
2014-01-16 15:16:35 +01:00
|
|
|
if !(allButSelf && address == selfAddress) // if we should skip sender() node and current address == self address => skip
|
2013-04-06 16:22:30 +02:00
|
|
|
valueHolder ← bucket.content.get(path)
|
|
|
|
|
ref ← valueHolder.ref
|
|
|
|
|
} ref forward msg
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def put(key: String, valueOption: Option[ActorRef]): Unit = {
|
|
|
|
|
val bucket = registry(selfAddress)
|
|
|
|
|
val v = nextVersion()
|
|
|
|
|
registry += (selfAddress -> bucket.copy(version = v,
|
|
|
|
|
content = bucket.content + (key -> ValueHolder(v, valueOption))))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def mkKey(ref: ActorRef): String = mkKey(ref.path)
|
|
|
|
|
|
2014-01-08 17:51:18 +01:00
|
|
|
def mkKey(path: ActorPath): String = path.toStringWithoutAddress
|
2013-04-06 16:22:30 +02:00
|
|
|
|
|
|
|
|
def myVersions: Map[Address, Long] = registry.map { case (owner, bucket) ⇒ (owner -> bucket.version) }
|
|
|
|
|
|
|
|
|
|
def collectDelta(otherVersions: Map[Address, Long]): immutable.Iterable[Bucket] = {
|
|
|
|
|
// missing entries are represented by version 0
|
|
|
|
|
val filledOtherVersions = myVersions.map { case (k, _) ⇒ k -> 0L } ++ otherVersions
|
2013-10-21 12:00:38 +02:00
|
|
|
var count = 0
|
2013-04-06 16:22:30 +02:00
|
|
|
filledOtherVersions.collect {
|
2013-10-21 12:00:38 +02:00
|
|
|
case (owner, v) if registry(owner).version > v && count < maxDeltaElements ⇒
|
2013-04-06 16:22:30 +02:00
|
|
|
val bucket = registry(owner)
|
|
|
|
|
val deltaContent = bucket.content.filter {
|
|
|
|
|
case (_, value) ⇒ value.version > v
|
|
|
|
|
}
|
2013-10-21 12:00:38 +02:00
|
|
|
count += deltaContent.size
|
|
|
|
|
if (count <= maxDeltaElements)
|
|
|
|
|
bucket.copy(content = deltaContent)
|
|
|
|
|
else {
|
|
|
|
|
// exceeded the maxDeltaElements, pick the elements with lowest versions
|
|
|
|
|
val sortedContent = deltaContent.toVector.sortBy(_._2.version)
|
|
|
|
|
val chunk = sortedContent.take(maxDeltaElements - (count - sortedContent.size))
|
|
|
|
|
bucket.copy(content = chunk.toMap, version = chunk.last._2.version)
|
|
|
|
|
}
|
2013-04-06 16:22:30 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def otherHasNewerVersions(otherVersions: Map[Address, Long]): Boolean =
|
|
|
|
|
otherVersions.exists {
|
|
|
|
|
case (owner, v) ⇒ v > registry(owner).version
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Gossip to peer nodes.
|
|
|
|
|
*/
|
|
|
|
|
def gossip(): Unit = selectRandomNode((nodes - selfAddress).toVector) foreach gossipTo
|
|
|
|
|
|
|
|
|
|
def gossipTo(address: Address): Unit = {
|
|
|
|
|
context.actorSelection(self.path.toStringWithAddress(address)) ! Status(versions = myVersions)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def selectRandomNode(addresses: immutable.IndexedSeq[Address]): Option[Address] =
|
|
|
|
|
if (addresses.isEmpty) None else Some(addresses(ThreadLocalRandom.current nextInt addresses.size))
|
|
|
|
|
|
|
|
|
|
def prune(): Unit = {
|
|
|
|
|
registry foreach {
|
|
|
|
|
case (owner, bucket) ⇒
|
|
|
|
|
val oldRemoved = bucket.content.collect {
|
|
|
|
|
case (key, ValueHolder(version, None)) if (bucket.version - version > removedTimeToLiveMillis) ⇒ key
|
|
|
|
|
}
|
|
|
|
|
if (oldRemoved.nonEmpty)
|
|
|
|
|
registry += owner -> bucket.copy(content = bucket.content -- oldRemoved)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Extension that starts a [[DistributedPubSubMediator]] actor
|
|
|
|
|
* with settings defined in config section `akka.contrib.cluster.pub-sub`.
|
|
|
|
|
*/
|
|
|
|
|
object DistributedPubSubExtension extends ExtensionId[DistributedPubSubExtension] with ExtensionIdProvider {
|
|
|
|
|
override def get(system: ActorSystem): DistributedPubSubExtension = super.get(system)
|
|
|
|
|
|
|
|
|
|
override def lookup = DistributedPubSubExtension
|
|
|
|
|
|
|
|
|
|
override def createExtension(system: ExtendedActorSystem): DistributedPubSubExtension =
|
|
|
|
|
new DistributedPubSubExtension(system)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
class DistributedPubSubExtension(system: ExtendedActorSystem) extends Extension {
|
|
|
|
|
|
|
|
|
|
private val config = system.settings.config.getConfig("akka.contrib.cluster.pub-sub")
|
|
|
|
|
private val role: Option[String] = config.getString("role") match {
|
|
|
|
|
case "" ⇒ None
|
|
|
|
|
case r ⇒ Some(r)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns true if this member is not tagged with the role configured for the
|
|
|
|
|
* mediator.
|
|
|
|
|
*/
|
|
|
|
|
def isTerminated: Boolean = Cluster(system).isTerminated || !role.forall(Cluster(system).selfRoles.contains)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* The [[DistributedPubSubMediator]]
|
|
|
|
|
*/
|
|
|
|
|
val mediator: ActorRef = {
|
|
|
|
|
if (isTerminated)
|
|
|
|
|
system.deadLetters
|
|
|
|
|
else {
|
2013-10-16 13:19:36 +02:00
|
|
|
val routingLogic = config.getString("routing-logic") match {
|
|
|
|
|
case "random" ⇒ RandomRoutingLogic()
|
|
|
|
|
case "round-robin" ⇒ RoundRobinRoutingLogic()
|
|
|
|
|
case "consistent-hashing" ⇒ ConsistentHashingRoutingLogic(system)
|
|
|
|
|
case "broadcast" ⇒ BroadcastRoutingLogic()
|
2013-10-21 12:00:38 +02:00
|
|
|
case other ⇒ throw new IllegalArgumentException(s"Unknown 'routing-logic': [$other]")
|
2013-10-16 13:19:36 +02:00
|
|
|
}
|
2014-01-23 20:02:18 +01:00
|
|
|
val gossipInterval = config.getDuration("gossip-interval", MILLISECONDS).millis
|
|
|
|
|
val removedTimeToLive = config.getDuration("removed-time-to-live", MILLISECONDS).millis
|
2013-10-21 12:00:38 +02:00
|
|
|
val maxDeltaElements = config.getInt("max-delta-elements")
|
2013-04-06 16:22:30 +02:00
|
|
|
val name = config.getString("name")
|
2013-10-21 12:00:38 +02:00
|
|
|
system.actorOf(DistributedPubSubMediator.props(role, routingLogic, gossipInterval, removedTimeToLive, maxDeltaElements),
|
2013-04-06 16:22:30 +02:00
|
|
|
name)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|