diff --git a/akka-cluster/src/main/protobuf/ClusterMessages.proto b/akka-cluster/src/main/protobuf/ClusterMessages.proto
index b0a1ffe6e9..a2575b0a76 100644
--- a/akka-cluster/src/main/protobuf/ClusterMessages.proto
+++ b/akka-cluster/src/main/protobuf/ClusterMessages.proto
@@ -150,7 +150,8 @@ message VectorClock {
required int32 hashIndex = 1;
required int64 timestamp = 2;
}
- required int64 timestamp = 1;
+ // the timestamp could be removed but left for test data compatibility
+ optional int64 timestamp = 1;
repeated Version versions = 2;
}
diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala
index 99d261f012..a8233110bd 100644
--- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala
@@ -9,7 +9,6 @@ import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.util.control.NonFatal
-import java.util.UUID
import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Cancellable, Props, PoisonPill, ReceiveTimeout, RootActorPath, Scheduler }
import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy.Stop
@@ -215,15 +214,17 @@ private[cluster] final class ClusterCoreSupervisor extends Actor with ActorLoggi
/**
* INTERNAL API.
*/
-private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Actor with ActorLogging
+private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with ActorLogging
with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
import InternalClusterAction._
val cluster = Cluster(context.system)
- import cluster.{ selfAddress, selfUniqueAddress, scheduler, failureDetector }
+ import cluster.{ selfAddress, scheduler, failureDetector }
import cluster.settings._
import cluster.InfoLogger._
+ protected def selfUniqueAddress = cluster.selfUniqueAddress
+
val NumberOfGossipsBeforeShutdownWhenLeaderExits = 3
def vclockName(node: UniqueAddress): String = node.address + "-" + node.uid
@@ -549,49 +550,63 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
else if (latestGossip.members.forall(_.uniqueAddress != from))
logInfo("Ignoring received gossip status from unknown [{}]", from)
else {
- (status.version tryCompareTo latestGossip.version) match {
- case Some(0) ⇒ // same version
- case Some(x) if x > 0 ⇒ gossipStatusTo(from, sender) // remote is newer
- case _ ⇒ gossipTo(from, sender) // conflicting or local is newer
+ (status.version compareTo latestGossip.version) match {
+ case VectorClock.Same ⇒ // same version
+ case VectorClock.After ⇒ gossipStatusTo(from, sender) // remote is newer
+ case _ ⇒ gossipTo(from, sender) // conflicting or local is newer
}
}
}
+ /**
+ * The types of gossip actions that receive gossip has performed.
+ */
+ sealed trait ReceiveGossipType
+ case object Ignored extends ReceiveGossipType
+ case object Older extends ReceiveGossipType
+ case object Newer extends ReceiveGossipType
+ case object Same extends ReceiveGossipType
+ case object Merge extends ReceiveGossipType
+
/**
* Receive new gossip.
*/
- def receiveGossip(envelope: GossipEnvelope): Unit = {
+ def receiveGossip(envelope: GossipEnvelope): ReceiveGossipType = {
val from = envelope.from
val remoteGossip = envelope.gossip
val localGossip = latestGossip
- if (envelope.to != selfUniqueAddress)
+ if (envelope.to != selfUniqueAddress) {
logInfo("Ignoring received gossip intended for someone else, from [{}] to [{}]", from.address, envelope.to)
- if (remoteGossip.overview.unreachable.exists(_.address == selfAddress))
+ Ignored
+ } else if (remoteGossip.overview.unreachable.exists(_.address == selfAddress)) {
logInfo("Ignoring received gossip with myself as unreachable, from [{}]", from.address)
- else if (localGossip.overview.unreachable.exists(_.uniqueAddress == from))
+ Ignored
+ } else if (localGossip.overview.unreachable.exists(_.uniqueAddress == from)) {
logInfo("Ignoring received gossip from unreachable [{}] ", from)
- else if (localGossip.members.forall(_.uniqueAddress != from))
+ Ignored
+ } else if (localGossip.members.forall(_.uniqueAddress != from)) {
logInfo("Ignoring received gossip from unknown [{}]", from)
- else if (remoteGossip.members.forall(_.uniqueAddress != selfUniqueAddress))
+ Ignored
+ } else if (remoteGossip.members.forall(_.uniqueAddress != selfUniqueAddress)) {
logInfo("Ignoring received gossip that does not contain myself, from [{}]", from)
- else {
+ Ignored
+ } else {
+ val comparison = remoteGossip.version compareTo localGossip.version
- val comparison = remoteGossip.version tryCompareTo localGossip.version
-
- val (winningGossip, talkback) = comparison match {
- case None ⇒
+ val (winningGossip, talkback, gossipType) = comparison match {
+ case VectorClock.Concurrent ⇒
// conflicting versions, merge
- (remoteGossip merge localGossip, true)
- case Some(0) ⇒
+ (remoteGossip merge localGossip, true, Merge)
+ case VectorClock.Same ⇒
// same version
- (remoteGossip mergeSeen localGossip, !remoteGossip.seenByNode(selfUniqueAddress))
- case Some(x) if x < 0 ⇒
+ (remoteGossip mergeSeen localGossip, !remoteGossip.seenByNode(selfUniqueAddress), Same)
+ case VectorClock.Before ⇒
// local is newer
- (localGossip, true)
- case _ ⇒
+ (localGossip, true, Older)
+ case VectorClock.After ⇒
// remote is newer
- (remoteGossip, !remoteGossip.seenByNode(selfUniqueAddress))
+ (remoteGossip, !remoteGossip.seenByNode(selfUniqueAddress), Newer)
}
latestGossip = winningGossip seen selfUniqueAddress
@@ -603,18 +618,18 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from)
- if (comparison.isEmpty) {
+ if (comparison == VectorClock.Concurrent) {
log.debug(
"""Couldn't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merged them into [{}]""",
remoteGossip, localGossip, winningGossip)
}
if (statsEnabled) {
- gossipStats = comparison match {
- case None ⇒ gossipStats.incrementMergeCount
- case Some(0) ⇒ gossipStats.incrementSameCount
- case Some(x) if x < 0 ⇒ gossipStats.incrementNewerCount
- case _ ⇒ gossipStats.incrementOlderCount
+ gossipStats = gossipType match {
+ case Merge ⇒ gossipStats.incrementMergeCount
+ case Same ⇒ gossipStats.incrementSameCount
+ case Newer ⇒ gossipStats.incrementNewerCount
+ case Older ⇒ gossipStats.incrementOlderCount
}
vclockStats = VectorClockStats(
versionSize = latestGossip.version.versions.size,
@@ -630,6 +645,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
// older or sender had newer
gossipTo(from, sender)
}
+ gossipType
}
}
diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala
index 4bae2006bc..fa51fa1adb 100644
--- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala
@@ -5,6 +5,7 @@
package akka.cluster
import scala.collection.immutable
+import scala.collection.immutable.TreeMap
import MemberStatus._
/**
@@ -61,8 +62,7 @@ private[cluster] object Gossip {
private[cluster] case class Gossip(
members: immutable.SortedSet[Member], // sorted set of members with their status, sorted by address
overview: GossipOverview = GossipOverview(),
- version: VectorClock = VectorClock()) // vector clock version
- extends Versioned[Gossip] {
+ version: VectorClock = VectorClock()) { // vector clock version
// TODO can be disabled as optimization
assertInvariants()
@@ -125,8 +125,10 @@ private[cluster] case class Gossip(
overview.seen.get(node).exists(_ == version)
}
- private def mergeSeenTables(allowed: Set[Member], one: Map[UniqueAddress, VectorClock], another: Map[UniqueAddress, VectorClock]): Map[UniqueAddress, VectorClock] = {
- (Map.empty[UniqueAddress, VectorClock] /: allowed) {
+ private def mergeSeenTables(allowed: immutable.SortedSet[Member],
+ one: TreeMap[UniqueAddress, VectorClock],
+ another: TreeMap[UniqueAddress, VectorClock]): TreeMap[UniqueAddress, VectorClock] =
+ (TreeMap.empty[UniqueAddress, VectorClock] /: allowed) {
(merged, member) ⇒
val node = member.uniqueAddress
(one.get(node), another.get(node)) match {
@@ -134,14 +136,13 @@ private[cluster] case class Gossip(
case (Some(v1), None) ⇒ merged.updated(node, v1)
case (None, Some(v2)) ⇒ merged.updated(node, v2)
case (Some(v1), Some(v2)) ⇒
- v1 tryCompareTo v2 match {
- case None ⇒ merged
- case Some(x) if x > 0 ⇒ merged.updated(node, v1)
- case _ ⇒ merged.updated(node, v2)
+ v1 compareTo v2 match {
+ case VectorClock.Same | VectorClock.After ⇒ merged.updated(node, v1)
+ case VectorClock.Before ⇒ merged.updated(node, v2)
+ case VectorClock.Concurrent ⇒ merged
}
}
}
- }
/**
* Merges the seen table of two Gossip instances.
@@ -236,7 +237,7 @@ private[cluster] case class Gossip(
*/
@SerialVersionUID(1L)
private[cluster] case class GossipOverview(
- seen: Map[UniqueAddress, VectorClock] = Map.empty,
+ seen: TreeMap[UniqueAddress, VectorClock] = TreeMap.empty,
unreachable: Set[Member] = Set.empty) {
override def toString =
diff --git a/akka-cluster/src/main/scala/akka/cluster/Member.scala b/akka-cluster/src/main/scala/akka/cluster/Member.scala
index 9d0ded1a34..2fa1cfdecf 100644
--- a/akka-cluster/src/main/scala/akka/cluster/Member.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/Member.scala
@@ -118,13 +118,7 @@ object Member {
*/
implicit val ordering: Ordering[Member] = new Ordering[Member] {
def compare(a: Member, b: Member): Int = {
- val result = addressOrdering.compare(a.address, b.address)
- if (result == 0) {
- val aUid = a.uniqueAddress.uid
- val bUid = b.uniqueAddress.uid
- if (aUid < bUid) -1 else if (aUid == bUid) 0 else 1
- } else
- result
+ a.uniqueAddress compare b.uniqueAddress
}
}
@@ -218,4 +212,13 @@ object MemberStatus {
* INTERNAL API
*/
@SerialVersionUID(1L)
-private[cluster] case class UniqueAddress(address: Address, uid: Int)
+private[cluster] case class UniqueAddress(address: Address, uid: Int) extends Ordered[UniqueAddress] {
+ @transient
+ override lazy val hashCode = scala.util.hashing.MurmurHash3.productHash(this)
+
+ override def compare(that: UniqueAddress): Int = {
+ val result = Member.addressOrdering.compare(this.address, that.address)
+ if (result == 0) if (this.uid < that.uid) -1 else if (this.uid == that.uid) 0 else 1
+ else result
+ }
+}
diff --git a/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala b/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala
index 8e53416810..1690f1cd73 100644
--- a/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala
@@ -5,87 +5,29 @@
package akka.cluster
import akka.AkkaException
-import akka.event.Logging
-import akka.actor.ActorSystem
import System.{ currentTimeMillis ⇒ newTimestamp }
import java.security.MessageDigest
import java.util.concurrent.atomic.AtomicLong
-
-class VectorClockException(message: String) extends AkkaException(message)
-
-/**
- * Trait to be extended by classes that wants to be versioned using a VectorClock.
- */
-trait Versioned[T] {
- def version: VectorClock
- def :+(node: VectorClock.Node): T
-}
-
-/**
- * Utility methods for comparing Versioned instances.
- */
-object Versioned {
-
- /**
- * The result of comparing two Versioned objects.
- * Either:
- * {{{
- * 1) v1 is BEFORE v2 => Before
- * 2) v1 is AFTER t2 => After
- * 3) v1 happens CONCURRENTLY to v2 => Concurrent
- * }}}
- */
- sealed trait Ordering
- case object Before extends Ordering
- case object After extends Ordering
- case object Concurrent extends Ordering
-
- /**
- * Returns or 'Ordering' for the two 'Versioned' instances.
- */
- def compare[T <: Versioned[T]](versioned1: Versioned[T], versioned2: Versioned[T]): Ordering = {
- if (versioned1.version <> versioned2.version) Concurrent
- else if (versioned1.version < versioned2.version) Before
- else After
- }
-
- /**
- * Returns the Versioned that have the latest version.
- */
- def latestVersionOf[T <: Versioned[T]](versioned1: T, versioned2: T): T = {
- compare(versioned1, versioned2) match {
- case Concurrent ⇒ versioned2
- case Before ⇒ versioned2
- case After ⇒ versioned1
- }
- }
-}
+import scala.collection.immutable.TreeMap
/**
* VectorClock module with helper classes and methods.
*
* Based on code from the 'vlock' VectorClock library by Coda Hale.
*/
-object VectorClock {
+private[cluster] object VectorClock {
/**
* Hash representation of a versioned node name.
*/
- sealed trait Node extends Serializable {
- def hash: String
- }
+ type Node = String
object Node {
- @SerialVersionUID(1L)
- private case class NodeImpl(name: String) extends Node {
- override def toString(): String = "Node(" + name + ")"
- override def hash: String = name
- }
- def apply(name: String): Node = NodeImpl(hash(name))
+ def apply(name: String): Node = hash(name)
- def fromHash(hash: String): Node = NodeImpl(hash)
+ def fromHash(hash: String): Node = hash
private def hash(name: String): String = {
val digester = MessageDigest.getInstance("MD5")
@@ -98,11 +40,8 @@ object VectorClock {
* Timestamp representation a unique 'Ordered' timestamp.
*/
@SerialVersionUID(1L)
- case class Timestamp(time: Long) extends Ordered[Timestamp] {
- def max(other: Timestamp) = {
- if (this < other) other
- else this
- }
+ final case class Timestamp(time: Long) extends Ordered[Timestamp] {
+ def max(other: Timestamp) = if (this < other) other else this
def compare(other: Timestamp) = time compare other.time
@@ -127,6 +66,12 @@ object VectorClock {
new Timestamp(newTime)
}
}
+
+ sealed trait Ordering
+ case object After extends Ordering
+ case object Before extends Ordering
+ case object Same extends Ordering
+ case object Concurrent extends Ordering
}
/**
@@ -141,9 +86,7 @@ object VectorClock {
*/
@SerialVersionUID(1L)
case class VectorClock(
- timestamp: VectorClock.Timestamp = VectorClock.Timestamp(),
- versions: Map[VectorClock.Node, VectorClock.Timestamp] = Map.empty[VectorClock.Node, VectorClock.Timestamp])
- extends PartiallyOrdered[VectorClock] {
+ versions: TreeMap[VectorClock.Node, VectorClock.Timestamp] = TreeMap.empty[VectorClock.Node, VectorClock.Timestamp]) {
import VectorClock._
@@ -155,7 +98,17 @@ case class VectorClock(
/**
* Returns true if this and that are concurrent else false.
*/
- def <>(that: VectorClock): Boolean = tryCompareTo(that) == None
+ def <>(that: VectorClock): Boolean = compareTo(that) == Concurrent
+
+ /**
+ * Returns true if this is before that else false.
+ */
+ def <(that: VectorClock): Boolean = compareTo(that) == Before
+
+ /**
+ * Returns true if this is after that else false.
+ */
+ def >(that: VectorClock): Boolean = compareTo(that) == After
/**
* Returns true if this VectorClock has the same history as the 'that' VectorClock else false.
@@ -163,39 +116,37 @@ case class VectorClock(
def ==(that: VectorClock): Boolean = versions == that.versions
/**
- * For the 'PartiallyOrdered' trait, to allow natural comparisons using <, > and ==.
- *