Moved FIXMEs into tickets. Hardened convergence.

Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
Jonas Bonér 2012-03-12 19:22:02 +01:00
parent dc80315b31
commit cf3fa9fa3c

View file

@ -29,20 +29,16 @@ import com.google.protobuf.ByteString
* Interface for membership change listener.
*/
trait MembershipChangeListener {
// FIXME bad for Java - convert to Array?
def notify(members: SortedSet[Member]): Unit
}
/**
* Interface for meta data change listener.
*/
trait MetaDataChangeListener { // FIXME add management and notification for MetaDataChangeListener
// FIXME bad for Java - convert to what?
trait MetaDataChangeListener {
def notify(meta: Map[String, Array[Byte]]): Unit
}
// FIXME create Protobuf messages out of all the Gossip stuff - but wait until the prototol is fully stablized.
/**
* Base trait for all cluster messages. All ClusterMessage's are serializable.
*/
@ -294,7 +290,7 @@ final class ClusterDaemonSupervisor extends Actor {
def receive = Actor.emptyBehavior
override def unhandled(unknown: Any): Unit = log.error("/system/cluster can not respond to messages - received [{}]", unknown)
override def unhandled(unknown: Any): Unit = log.error("[/system/cluster] can not respond to messages - received [{}]", unknown)
}
/**
@ -376,7 +372,7 @@ class Node(system: ExtendedActorSystem) extends Extension {
private val log = Logging(system, "Node")
private val random = SecureRandom.getInstance("SHA1PRNG")
log.info("Node [{}] - Starting cluster Node...", remoteAddress)
log.info("Node [{}] - is JOINING cluster...", remoteAddress)
// create superisor for daemons under path "/system/cluster"
private val clusterDaemons = {
@ -415,7 +411,7 @@ class Node(system: ExtendedActorSystem) extends Extension {
leaderActions()
}
log.info("Node [{}] - Cluster Node started successfully", remoteAddress)
log.info("Node [{}] - have JOINED cluster successfully", remoteAddress)
// ======================================================
// ===================== PUBLIC API =====================
@ -464,7 +460,6 @@ class Node(system: ExtendedActorSystem) extends Extension {
* Shuts down all connections to other members, the cluster daemon and the periodic gossip and cleanup tasks.
*/
def shutdown() {
// FIXME Cheating for now. Can't just shut down. Node must first gossip an Leave command, wait for Leader to do proper Handoff and then await an Exit command before switching to Removed
if (isRunning.compareAndSet(true, false)) {
log.info("Node [{}] - Shutting down Node and cluster daemons...", remoteAddress)
gossipCanceller.cancel()
@ -534,7 +529,7 @@ class Node(system: ExtendedActorSystem) extends Extension {
*/
@tailrec
private[cluster] final def joining(node: Address) {
log.info("Node [{}] - Node [{}] is joining", remoteAddress, node)
log.info("Node [{}] - Node [{}] is JOINING", remoteAddress, node)
val localState = state.get
val localGossip = localState.latestGossip
@ -567,28 +562,28 @@ class Node(system: ExtendedActorSystem) extends Extension {
* State transition to UP.
*/
private[cluster] final def up(address: Address) {
// FIXME implement me
log.info("Node [{}] - Marking node [{}] as UP", remoteAddress, address)
}
/**
* State transition to LEAVING.
*/
private[cluster] final def leaving(address: Address) {
// FIXME implement me
log.info("Node [{}] - Marking node [{}] as LEAVING", remoteAddress, address)
}
/**
* State transition to EXITING.
*/
private[cluster] final def exiting(address: Address) {
// FIXME implement me
log.info("Node [{}] - Marking node [{}] as EXITING", remoteAddress, address)
}
/**
* State transition to REMOVED.
*/
private[cluster] final def removing(address: Address) {
// FIXME implement me
log.info("Node [{}] - Marking node [{}] as REMOVED", remoteAddress, address)
}
/**
@ -908,8 +903,6 @@ class Node(system: ExtendedActorSystem) extends Extension {
// we don't have convergence - so we might have unreachable nodes
// if 'auto-down' is turned on, then try to auto-down any unreachable nodes
// FIXME Should we let the leader auto-down every run (as it is now) or just every X seconds? So we can wait for user to invoke explicit DOWN.
// 3. Move UNREACHABLE => DOWN (auto-downing by leader)
val newUnreachableMembers =
localUnreachableMembers
@ -921,7 +914,6 @@ class Node(system: ExtendedActorSystem) extends Extension {
}
// removing nodes marked as DOWN from the 'seen' table
// FIXME this needs to be done if user issues DOWN as well
val newSeen = localUnreachableMembers.foldLeft(localSeen)((currentSeen, member) currentSeen - member.address)
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview
@ -965,7 +957,7 @@ class Node(system: ExtendedActorSystem) extends Extension {
// 2. all unreachable members in the set have status DOWN
// Else we can't continue to check for convergence
// When that is done we check that all the entries in the 'seen' table have the same vector clock version
if (unreachable.isEmpty || !unreachable.exists(_.status != MemberStatus.Down)) {
if (unreachable.isEmpty || !unreachable.exists(m m.status != MemberStatus.Down || m.status != MemberStatus.Removed)) {
val seen = gossip.overview.seen
val views = Set.empty[VectorClock] ++ seen.values