diff --git a/akka-remote/src/main/scala/akka/remote/Gossiper.scala b/akka-remote/src/main/scala/akka/remote/Gossiper.scala new file mode 100644 index 0000000000..1164df5009 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/Gossiper.scala @@ -0,0 +1,143 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ + +package akka.remote + +import akka.util.duration._ +import akka.actor.Scheduler + +import java.net.InetSocketAddress +import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.TimeUnit + +import scala.collection.immutable.Map +import scala.annotation.tailrec + +/** + * This module is responsible for Gossiping cluster information. The abstraction + * maintains the list of live and dead nodes. Periodically i.e. every 1 second this module + * chooses a random node and initiates a round of Gossip with it. This module as and when it hears a gossip + * updates the Failure Detector with the liveness information. + *

+ * The implementation is based on this paper by Amazon [http://www.cs.cornell.edu/home/rvr/papers/flowgossip.pdf]. + *

+ * Gossip timer task runs every second. + *

+ * During each of these runs the node initiates gossip exchange according to following rules (as defined in the + * Cassandra documentation: + *

+ * 1) Gossip to random live node (if any) + * 2) Gossip to random unreachable node with certain probability depending on number of unreachable and live nodes + * 3) If the node gossiped to at (1) was not seed, or the number of live nodes is less than number of seeds, + * gossip to random seed with certain probability depending on number of unreachable, seed and live nodes. + *

+ */ +class Gossiper(failureDetector: AccrualFailureDetector, address: InetSocketAddress) { + + // FIXME make configurable + val initalDelayForGossip = 5 seconds + val gossipFrequency = 1 seconds + val timeUnit = TimeUnit.SECONDS + + val seeds = Vector(address) // FIXME read in list of seeds from config + + // Implement using optimistic lockless concurrency, all state is represented + // by this immutable case class and managed by an AtomicReference + private case class State( + nodeStates: Map[InetSocketAddress, Map[]] = Map { (address -> Map.empty[]) }, + aliveNodes: Vector[InetSocketAddress] = Vector.empty[InetSocketAddress], + deadNodes: Vector[InetSocketAddress] = Vector.empty[InetSocketAddress], + nodeStateChangeListeners: Vector[NodeStateChangeListeners] = Vector.empty[NodeStateChangeListeners], + applicationStateChangePublishers: Vector[ApplicationStateChangePublishers] = Vector.empty[ApplicationStateChangePublishers], + versions: Map[String, Long] = Map.empty[String, Long], + generation: Long = System.currentTimeMillis) + + private val state = new AtomicReference[State](State()) + + Scheduler.schedule(() => initateGossipExchange(), initalDelayForGossip, gossipFrequency, timeUnit) + Scheduler.schedule(() => scrutinizeCluster(), initalDelayForGossip + 1, gossipFrequency, timeUnit) + + + @tailrec + final private def initateGossipExchange() { + val oldState = state.get + + val versions = oldState.versions map (_ + 1) + val nodeStates = + for { + publisher <- oldState.applicationStateChangePublishers + version <- versions.get(publisher.name) + nodeState <- oldState.nodeStates.get(address) + publisherState <- nodeState.get(publisher.name) + } yield { + // FIXME + // self._node_states[options.address][publisher.name()] = {key:value, "generation" : publisher.generation(), "version" : version } + oldState.nodeStates + } + + val newState = oldState copy (versions = versions, nodeStates = nodeStates) + + // if we won the race then update else try again + if (!state.compareAndSet(oldState, newState)) initateGossipExchange() // recur + else { + // gossip to alive nodes + val oldAliveNodes = oldState.aliveNodes + val oldAliveNodesSize = oldAliveNodes.size + val gossipedToSeed = + if (oldAliveNodesSize > 0) sendGossip(oldAliveNodes) + else false + + // gossip to dead nodes + val oldDeadNodes = oldState.deadNodes + val oldDeadNodesSize = oldDeadNodes.size + if (oldDeadNodesSize > 0) { + val probability: Double = oldDeadNodesSize / (oldAliveNodesSize + 1) + if (random() < probability) sendGossip(oldDeadNodes) + } + + if (!gossipedToSeed || oldAliveNodesSize < 1) { + // gossip to a seed for facilitating partition healing + if (seeds.head != address) { + if (oldAliveNodesSize == 0) sendGossip(seeds) + else { + val probability = 1.0 / oldAliveNodesSize + oldDeadNodesSize + if (random() <= probability) sendGossip(seeds) + } + } + } + } + } + + /** + * Gossips to alive nodes. Returns 'true' if it gossiped to a "seed" node. + */ + private def sendGossip(nodes: Vector[InetSocketAddress]): Boolean { + true + } + + @tailrec + final private def scrutinizeCluster() { + val oldState = state.get + + val newState = oldState + + // if we won the race then update else try again + if (!state.compareAndSet(oldState, newState)) scrutinizeCluster() // recur + } +} + +object Gossiper { + trait ApplicationStateChangePublishers { + def name: String + def value: AnyRef + def generation() + } + + trait NodeStateChangeListeners { + def onJoin(node: InetSocketAddress) + def onAlive(node: InetSocketAddress) + def onDead(node: InetSocketAddress) + def onChange(node: InetSocketAddress, name: String, oldValue: AnyRef, newValue: AnyRef) + } +} diff --git a/akka-remote/src/main/scala/akka/remote/VectorClock.scala b/akka-remote/src/main/scala/akka/remote/VectorClock.scala new file mode 100644 index 0000000000..e0373a21e3 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/VectorClock.scala @@ -0,0 +1,113 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ + +package akka.remote + +import akka.AkkaException + +class VectorClockException(message: String) extends AkkaException(message) + +/** + * Representation of a Vector-based clock (counting clock). + * For details see Wikipedia: [http://en.wikipedia.org/wiki/Vector_clock]. + */ +case class VectorClock( + versions: Vector[VectorClock.Entry] = Vector.empty[VectorClock.Entry], + timestamp: Long = System.currentTimeMillis) { + import VectorClock._ + + def compare(other: VectorClock): Ordering = VectorClock.compare(this, other) + + def incrementVersionForNode(nodeId: Int, timestamp: Long): VectorClock = { + val newVersions = + if (versions exists (entry ⇒ entry.nodeId == nodeId)) { + // update existing node entry + versions map { entry ⇒ + if (entry.nodeId == nodeId) entry.increment() + else entry + } + } else { + // create and append a new node entry + versions :+ Entry(nodeId = nodeId) + } + if (newVersions.size > MaxNrOfVersions) throw new VectorClockException("Max number of versions reached") + copy(versions = newVersions, timestamp = timestamp) + } + + def maxVersion: Long = versions.foldLeft(1L)((max, entry) ⇒ math.max(max, entry.version)) + + def merge(other: VectorClock): VectorClock = { + sys.error("Not implemented") + } +} + +/** + * Module with helper classes and methods. + */ +object VectorClock { + final val MaxNrOfVersions = Short.MaxValue + + /** + * The result of comparing two vector clocks. + * Either: + * 1) v1 is BEFORE v2 + * 2) v1 is AFTER t2 + * 3) v1 happens CONCURRENTLY to v2 + */ + sealed trait Ordering + case object Before extends Ordering + case object After extends Ordering + case object Concurrently extends Ordering + + /** + * Versioned entry in a vector clock. + */ + case class Entry(nodeId: Int, version: Long = 1) { + def increment(): Entry = copy(version = version + 1) + } + + /** + * Compare two vector clocks. The outcomes will be one of the following: + *

+ * 1. Clock 1 is BEFORE clock 2 if there exists an i such that c1(i) <= c(2) and there does not exist a j such that c1(j) > c2(j). + * 2. Clock 1 is CONCURRENT to clock 2 if there exists an i, j such that c1(i) < c2(i) and c1(j) > c2(j). + * 3. Clock 1 is AFTER clock 2 otherwise. + * + * @param v1 The first VectorClock + * @param v2 The second VectorClock + */ + def compare(v1: VectorClock, v2: VectorClock): Ordering = { + if ((v1 eq null) || (v2 eq null)) throw new IllegalArgumentException("Can't compare null VectorClocks") + + // FIXME rewrite to functional style, now uses ugly imperative algorithm + + var v1Bigger, v2Bigger = false // We do two checks: v1 <= v2 and v2 <= v1 if both are true then + var p1, p2 = 0 + + while (p1 < v1.versions.size && p2 < v2.versions.size) { + val ver1 = v1.versions(p1) + val ver2 = v2.versions(p2) + if (ver1.nodeId == ver2.nodeId) { + if (ver1.version > ver2.version) v1Bigger = true + else if (ver2.version > ver1.version) v2Bigger = true + p1 += 1 + p2 += 1 + } else if (ver1.nodeId > ver2.nodeId) { + v2Bigger = true // Since ver1 is bigger that means it is missing a version that ver2 has + p2 += 1 + } else { + v1Bigger = true // This means ver2 is bigger which means it is missing a version ver1 has + p1 += 1 + } + } + + if (p1 < v1.versions.size) v1Bigger = true + else if (p2 < v2.versions.size) v2Bigger = true + + if (!v1Bigger && !v2Bigger) Before // This is the case where they are equal, return BEFORE arbitrarily + else if (v1Bigger && !v2Bigger) After // This is the case where v1 is a successor clock to v2 + else if (!v1Bigger && v2Bigger) Before // This is the case where v2 is a successor clock to v1 + else Concurrently // This is the case where both clocks are parallel to one another + } +} diff --git a/akka-remote/src/test/scala/akka/remote/VectorClockSpec.scala b/akka-remote/src/test/scala/akka/remote/VectorClockSpec.scala new file mode 100644 index 0000000000..10b8ea1714 --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/VectorClockSpec.scala @@ -0,0 +1,126 @@ +package akka.remote + +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers + +import java.net.InetSocketAddress + +class VectorClockSpec extends WordSpec with MustMatchers { + import VectorClock._ + + "An VectorClock" must { + + "have zero versions when created" in { + val clock = VectorClock() + clock.versions must be(Vector()) + } + + "be able to add Entry if non-existing" in { + val clock1 = VectorClock() + clock1.versions must be(Vector()) + val clock2 = clock1.incrementVersionForNode(1, System.currentTimeMillis) + val clock3 = clock2.incrementVersionForNode(2, System.currentTimeMillis) + + clock3.versions must be(Vector(Entry(1, 1), Entry(2, 1))) + } + + "be able to increment version of existing Entry" in { + val clock1 = VectorClock() + val clock2 = clock1.incrementVersionForNode(1, System.currentTimeMillis) + val clock3 = clock2.incrementVersionForNode(2, System.currentTimeMillis) + val clock4 = clock3.incrementVersionForNode(1, System.currentTimeMillis) + val clock5 = clock4.incrementVersionForNode(2, System.currentTimeMillis) + val clock6 = clock5.incrementVersionForNode(2, System.currentTimeMillis) + + clock6.versions must be(Vector(Entry(1, 2), Entry(2, 3))) + } + + "The empty clock should not happen before itself" in { + val clock1 = VectorClock() + val clock2 = VectorClock() + + clock1.compare(clock2) must not be (Concurrently) + } + + "A clock should not happen before an identical clock" in { + val clock1_1 = VectorClock() + val clock2_1 = clock1_1.incrementVersionForNode(1, System.currentTimeMillis) + val clock3_1 = clock2_1.incrementVersionForNode(2, System.currentTimeMillis) + val clock4_1 = clock3_1.incrementVersionForNode(1, System.currentTimeMillis) + + val clock1_2 = VectorClock() + val clock2_2 = clock1_2.incrementVersionForNode(1, System.currentTimeMillis) + val clock3_2 = clock2_2.incrementVersionForNode(2, System.currentTimeMillis) + val clock4_2 = clock3_2.incrementVersionForNode(1, System.currentTimeMillis) + + clock4_1.compare(clock4_2) must not be (Concurrently) + } + + "A clock should happen before an identical clock with a single additional event" in { + val clock1_1 = VectorClock() + val clock2_1 = clock1_1.incrementVersionForNode(1, System.currentTimeMillis) + val clock3_1 = clock2_1.incrementVersionForNode(2, System.currentTimeMillis) + val clock4_1 = clock3_1.incrementVersionForNode(1, System.currentTimeMillis) + + val clock1_2 = VectorClock() + val clock2_2 = clock1_2.incrementVersionForNode(1, System.currentTimeMillis) + val clock3_2 = clock2_2.incrementVersionForNode(2, System.currentTimeMillis) + val clock4_2 = clock3_2.incrementVersionForNode(1, System.currentTimeMillis) + val clock5_2 = clock4_2.incrementVersionForNode(3, System.currentTimeMillis) + + clock4_1.compare(clock5_2) must be(Before) + } + + "Two clocks with different events should be concurrent: 1" in { + var clock1_1 = VectorClock() + val clock2_1 = clock1_1.incrementVersionForNode(1, System.currentTimeMillis) + + val clock1_2 = VectorClock() + val clock2_2 = clock1_2.incrementVersionForNode(2, System.currentTimeMillis) + + clock2_1.compare(clock2_2) must be(Concurrently) + } + + "Two clocks with different events should be concurrent: 2" in { + val clock1_3 = VectorClock() + val clock2_3 = clock1_3.incrementVersionForNode(1, System.currentTimeMillis) + val clock3_3 = clock2_3.incrementVersionForNode(2, System.currentTimeMillis) + val clock4_3 = clock3_3.incrementVersionForNode(1, System.currentTimeMillis) + + val clock1_4 = VectorClock() + val clock2_4 = clock1_4.incrementVersionForNode(1, System.currentTimeMillis) + val clock3_4 = clock2_4.incrementVersionForNode(1, System.currentTimeMillis) + val clock4_4 = clock3_4.incrementVersionForNode(3, System.currentTimeMillis) + + clock4_3.compare(clock4_4) must be(Concurrently) + } + + ".." in { + val clock1_1 = VectorClock() + val clock2_1 = clock1_1.incrementVersionForNode(2, System.currentTimeMillis) + val clock3_1 = clock2_1.incrementVersionForNode(2, System.currentTimeMillis) + + val clock1_2 = VectorClock() + val clock2_2 = clock1_2.incrementVersionForNode(1, System.currentTimeMillis) + val clock3_2 = clock2_2.incrementVersionForNode(2, System.currentTimeMillis) + val clock4_2 = clock3_2.incrementVersionForNode(2, System.currentTimeMillis) + val clock5_2 = clock4_2.incrementVersionForNode(3, System.currentTimeMillis) + + clock3_1.compare(clock5_2) must be(Before) + } + + "..." in { + val clock1_1 = VectorClock() + val clock2_1 = clock1_1.incrementVersionForNode(1, System.currentTimeMillis) + val clock3_1 = clock2_1.incrementVersionForNode(2, System.currentTimeMillis) + val clock4_1 = clock3_1.incrementVersionForNode(2, System.currentTimeMillis) + val clock5_1 = clock4_1.incrementVersionForNode(3, System.currentTimeMillis) + + val clock1_2 = VectorClock() + val clock2_2 = clock1_2.incrementVersionForNode(2, System.currentTimeMillis) + val clock3_2 = clock2_2.incrementVersionForNode(2, System.currentTimeMillis) + + clock5_1.compare(clock3_2) must be(After) + } + } +} \ No newline at end of file