2013-01-14 14:09:53 +01:00
|
|
|
/**
|
|
|
|
|
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
package akka.contrib.pattern
|
|
|
|
|
|
|
|
|
|
import scala.concurrent.duration._
|
|
|
|
|
import akka.actor.Actor
|
|
|
|
|
import akka.actor.Actor.Receive
|
|
|
|
|
import akka.actor.ActorLogging
|
|
|
|
|
import akka.actor.ActorRef
|
|
|
|
|
import akka.actor.Address
|
|
|
|
|
import akka.actor.FSM
|
|
|
|
|
import akka.actor.Props
|
|
|
|
|
import akka.actor.Terminated
|
|
|
|
|
import akka.cluster.Cluster
|
|
|
|
|
import akka.cluster.ClusterEvent._
|
|
|
|
|
import akka.AkkaException
|
|
|
|
|
|
|
|
|
|
object ClusterSingletonManager {
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Internal API
|
|
|
|
|
* public due to the `with FSM` type parameters
|
|
|
|
|
*/
|
|
|
|
|
sealed trait State
|
|
|
|
|
/**
|
|
|
|
|
* Internal API
|
|
|
|
|
* public due to the `with FSM` type parameters
|
|
|
|
|
*/
|
|
|
|
|
sealed trait Data
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Internal API
|
|
|
|
|
*/
|
|
|
|
|
private object Internal {
|
|
|
|
|
/**
|
|
|
|
|
* Sent from new leader to previous leader to initate the
|
2013-01-28 08:47:52 +01:00
|
|
|
* hand-over process. `HandOverInProgress` and `HandOverDone`
|
2013-01-14 14:09:53 +01:00
|
|
|
* are expected replies.
|
|
|
|
|
*/
|
|
|
|
|
case object HandOverToMe
|
|
|
|
|
/**
|
|
|
|
|
* Confirmation by the previous leader that the hand
|
2013-01-28 08:47:52 +01:00
|
|
|
* over process, shut down of the singleton actor, has
|
2013-01-14 14:09:53 +01:00
|
|
|
* started.
|
|
|
|
|
*/
|
|
|
|
|
case object HandOverInProgress
|
|
|
|
|
/**
|
|
|
|
|
* Confirmation by the previous leader that the singleton
|
2013-01-28 08:47:52 +01:00
|
|
|
* actor has been terminated and the hand-over process is
|
2013-01-14 14:09:53 +01:00
|
|
|
* completed. The `handOverData` holds the message, if any,
|
|
|
|
|
* sent from the singleton actor to its parent ClusterSingletonManager
|
|
|
|
|
* when shutting down. It is passed to the `singletonProps`
|
|
|
|
|
* factory on the new leader node.
|
|
|
|
|
*/
|
|
|
|
|
case class HandOverDone(handOverData: Option[Any])
|
|
|
|
|
/**
|
|
|
|
|
* Sent from from previous leader to new leader to
|
2013-01-28 08:47:52 +01:00
|
|
|
* initiate the normal hand-over process.
|
2013-01-14 14:09:53 +01:00
|
|
|
* Especially useful when new node joins and becomes
|
|
|
|
|
* leader immediately, without knowing who was previous
|
|
|
|
|
* leader.
|
|
|
|
|
*/
|
|
|
|
|
case object TakeOverFromMe
|
|
|
|
|
|
|
|
|
|
case class HandOverRetry(count: Int)
|
|
|
|
|
case class TakeOverRetry(leaderPeer: ActorRef, count: Int)
|
|
|
|
|
case object Cleanup
|
|
|
|
|
case object StartLeaderChangedBuffer
|
|
|
|
|
|
|
|
|
|
case object Start extends State
|
|
|
|
|
case object Leader extends State
|
|
|
|
|
case object NonLeader extends State
|
|
|
|
|
case object BecomingLeader extends State
|
|
|
|
|
case object WasLeader extends State
|
|
|
|
|
case object HandingOver extends State
|
|
|
|
|
case object TakeOver extends State
|
|
|
|
|
|
|
|
|
|
case object Uninitialized extends Data
|
|
|
|
|
case class NonLeaderData(leaderOption: Option[Address]) extends Data
|
|
|
|
|
case class BecomingLeaderData(previousLeaderOption: Option[Address]) extends Data
|
|
|
|
|
case class LeaderData(singleton: ActorRef, singletonTerminated: Boolean = false,
|
|
|
|
|
handOverData: Option[Any] = None) extends Data
|
|
|
|
|
case class WasLeaderData(singleton: ActorRef, singletonTerminated: Boolean, handOverData: Option[Any],
|
|
|
|
|
newLeader: Address) extends Data
|
|
|
|
|
case class HandingOverData(singleton: ActorRef, handOverTo: Option[ActorRef], handOverData: Option[Any]) extends Data
|
|
|
|
|
|
|
|
|
|
val HandOverRetryTimer = "hand-over-retry"
|
|
|
|
|
val TakeOverRetryTimer = "take-over-retry"
|
|
|
|
|
val CleanupTimer = "cleanup"
|
|
|
|
|
|
|
|
|
|
object LeaderChangedBuffer {
|
|
|
|
|
/**
|
|
|
|
|
* Request to deliver one more event.
|
|
|
|
|
*/
|
|
|
|
|
case object GetNext
|
|
|
|
|
/**
|
|
|
|
|
* The first event, corresponding to CurrentClusterState.
|
|
|
|
|
*/
|
|
|
|
|
case class InitialLeaderState(leader: Option[Address], memberCount: Int)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Notifications of [[akka.cluster.ClusterEvent.LeaderChanged]] is tunneled
|
|
|
|
|
* via this actor (child of ClusterSingletonManager) to be able to deliver
|
|
|
|
|
* one change at a time. Avoiding simultaneous leader changes simplifies
|
|
|
|
|
* the process in ClusterSingletonManager. ClusterSingletonManager requests
|
|
|
|
|
* next event with `GetNext` when it is ready for it. Only one outstanding
|
|
|
|
|
* `GetNext` request is allowed. Incoming events are buffered and delivered
|
|
|
|
|
* upon `GetNext` request.
|
|
|
|
|
*/
|
|
|
|
|
class LeaderChangedBuffer extends Actor {
|
|
|
|
|
import LeaderChangedBuffer._
|
|
|
|
|
import context.dispatcher
|
|
|
|
|
|
|
|
|
|
val cluster = Cluster(context.system)
|
|
|
|
|
var changes = Vector.empty[AnyRef]
|
|
|
|
|
var memberCount = 0
|
|
|
|
|
|
|
|
|
|
// subscribe to LeaderChanged, re-subscribe when restart
|
|
|
|
|
override def preStart(): Unit = cluster.subscribe(self, classOf[LeaderChanged])
|
|
|
|
|
override def postStop(): Unit = cluster.unsubscribe(self)
|
|
|
|
|
|
|
|
|
|
def receive = {
|
|
|
|
|
case state: CurrentClusterState ⇒
|
|
|
|
|
changes :+= InitialLeaderState(state.leader, state.members.size)
|
|
|
|
|
case event: LeaderChanged ⇒
|
|
|
|
|
changes :+= event
|
|
|
|
|
case GetNext if changes.isEmpty ⇒
|
|
|
|
|
context.become(deliverNext, discardOld = false)
|
|
|
|
|
case GetNext ⇒
|
|
|
|
|
val event = changes.head
|
|
|
|
|
changes = changes.tail
|
|
|
|
|
context.parent ! event
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// the buffer was empty when GetNext was received, deliver next event immediately
|
|
|
|
|
def deliverNext: Actor.Receive = {
|
|
|
|
|
case state: CurrentClusterState ⇒
|
|
|
|
|
context.parent ! InitialLeaderState(state.leader, state.members.size)
|
|
|
|
|
context.unbecome()
|
|
|
|
|
case event: LeaderChanged ⇒
|
|
|
|
|
context.parent ! event
|
|
|
|
|
context.unbecome()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Java API. Factory for the [[akka.actor.Props]] of the singleton
|
|
|
|
|
* actor instance. Used in constructor of
|
|
|
|
|
* [[akka.contrib.pattern.ClusterSingletonManager]]
|
|
|
|
|
*/
|
|
|
|
|
@SerialVersionUID(1L)
|
|
|
|
|
trait ClusterSingletonPropsFactory extends Serializable {
|
|
|
|
|
/**
|
|
|
|
|
* Create the `Props` from the `handOverData` sent from
|
|
|
|
|
* previous singleton. `handOverData` might be null
|
2013-01-28 08:47:52 +01:00
|
|
|
* when no hand-over took place, or when the there is no need
|
2013-01-14 14:09:53 +01:00
|
|
|
* for sending data to the new singleton.
|
|
|
|
|
*/
|
|
|
|
|
def create(handOverData: Any): Props
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Thrown when a consistent state can't be determined within the
|
|
|
|
|
* defined retry limits. Eventually it will reach a stable state and
|
|
|
|
|
* can continue, and that is simplified by starting over with a clean
|
|
|
|
|
* state. Parent supervisor should typically restart the actor, i.e.
|
|
|
|
|
* default decision.
|
|
|
|
|
*/
|
|
|
|
|
class ClusterSingletonManagerIsStuck(message: String) extends AkkaException(message, null)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Manages a cluster wide singleton actor instance, i.e.
|
|
|
|
|
* at most one singleton instance is running at any point in time.
|
|
|
|
|
* The ClusterSingletonManager is supposed to be started on all
|
|
|
|
|
* nodes in the cluster with `actorOf`. The actual singleton is
|
|
|
|
|
* started on the leader node of the cluster by creating a child
|
|
|
|
|
* actor from the supplied `singletonProps`.
|
|
|
|
|
*
|
|
|
|
|
* The singleton actor is always running on the leader member, which is
|
|
|
|
|
* nothing more than the address currently sorted first in the member
|
|
|
|
|
* ring. This can change when adding or removing members. A graceful hand
|
|
|
|
|
* over can normally be performed when joining a new node that becomes
|
|
|
|
|
* leader or removing current leader node. Be aware that there is a
|
|
|
|
|
* short time period when there is no active singleton during the
|
2013-01-28 08:47:52 +01:00
|
|
|
* hand-over process.
|
2013-01-14 14:09:53 +01:00
|
|
|
*
|
|
|
|
|
* The singleton actor can at any time send a message to its parent
|
|
|
|
|
* ClusterSingletonManager and this message will be passed to the
|
|
|
|
|
* `singletonProps` factory on the new leader node when a graceful
|
2013-01-28 08:47:52 +01:00
|
|
|
* hand-over is performed.
|
2013-01-14 14:09:53 +01:00
|
|
|
*
|
|
|
|
|
* The cluster failure detector will notice when a leader node
|
2013-01-28 08:47:52 +01:00
|
|
|
* becomes unreachable due to things like JVM crash, hard shut down,
|
2013-01-14 14:09:53 +01:00
|
|
|
* or network failure. Then a new leader node will take over and a
|
|
|
|
|
* new singleton actor is created. For these failure scenarios there
|
2013-01-28 08:47:52 +01:00
|
|
|
* will not be a graceful hand-over, but more than one active singletons
|
2013-01-14 14:09:53 +01:00
|
|
|
* is prevented by all reasonable means. Some corner cases are eventually
|
|
|
|
|
* resolved by configurable timeouts.
|
|
|
|
|
*
|
|
|
|
|
* You access the singleton actor with `actorFor` using the names you have
|
|
|
|
|
* specified when creating the ClusterSingletonManager. You can subscribe to
|
|
|
|
|
* [[akka.cluster.ClusterEvent.LeaderChanged]] to keep track of which node
|
|
|
|
|
* it is supposed to be running on. Alternatively the singleton actor may
|
|
|
|
|
* broadcast its existence when it is started.
|
|
|
|
|
*
|
|
|
|
|
* ==Arguments==
|
|
|
|
|
*
|
|
|
|
|
* '''''singletonProps''''' Factory for [[akka.actor.Props]] of the
|
|
|
|
|
* singleton actor instance. The `Option` parameter is the the
|
|
|
|
|
* `handOverData` sent from previous singleton. `handOverData`
|
2013-01-28 08:47:52 +01:00
|
|
|
* might be None when no hand-over took place, or when the there
|
2013-01-14 14:09:53 +01:00
|
|
|
* is no need for sending data to the new singleton. The `handOverData`
|
|
|
|
|
* is typically passed as parameter to the constructor of the
|
|
|
|
|
* singleton actor.
|
|
|
|
|
*
|
|
|
|
|
* '''''singletonName''''' The actor name of the child singleton actor.
|
|
|
|
|
*
|
|
|
|
|
* '''''terminationMessage''''' When handing over to a new leader node
|
|
|
|
|
* this `terminationMessage` is sent to the singleton actor to tell
|
|
|
|
|
* it to finish its work, close resources, and stop. It can sending
|
|
|
|
|
* a message back to the parent ClusterSingletonManager, which will
|
|
|
|
|
* passed to the `singletonProps` factory on the new leader node.
|
2013-01-28 08:47:52 +01:00
|
|
|
* The hand-over to the new leader node is completed when the
|
2013-01-14 14:09:53 +01:00
|
|
|
* singleton actor is terminated.
|
|
|
|
|
* Note that [[akka.actor.PoisonPill]] is a perfectly fine
|
|
|
|
|
* `terminationMessage` if you only need to stop the actor.
|
|
|
|
|
*
|
|
|
|
|
* '''''maxHandOverRetries''''' When a node is becoming leader it sends
|
2013-01-28 08:47:52 +01:00
|
|
|
* hand-over request to previous leader. This is retried with the
|
2013-01-14 14:09:53 +01:00
|
|
|
* `retryInterval` until the previous leader confirms that the hand
|
|
|
|
|
* over has started, or this `maxHandOverRetries` limit has been
|
|
|
|
|
* reached. If the retry limit is reached it takes the decision to be
|
|
|
|
|
* the new leader if previous leader is unknown (typically removed or
|
|
|
|
|
* downed), otherwise it initiates a new round by throwing
|
|
|
|
|
* [[akka.contrib.pattern.ClusterSingletonManagerIsStuck]] and expecting
|
|
|
|
|
* restart with fresh state. For a cluster with many members you might
|
|
|
|
|
* need to increase this retry limit because it takes longer time to
|
|
|
|
|
* propagate changes across all nodes.
|
|
|
|
|
*
|
|
|
|
|
* '''''maxTakeOverRetries''''' When a leader node is not leader any more
|
|
|
|
|
* it sends take over request to the new leader to initiate the normal
|
2013-01-28 08:47:52 +01:00
|
|
|
* hand-over process. This is especially useful when new node joins and becomes
|
2013-01-14 14:09:53 +01:00
|
|
|
* leader immediately, without knowing who was previous leader. This is retried
|
|
|
|
|
* with the `retryInterval` until this retry limit has been reached. If the retry
|
|
|
|
|
* limit is reached it initiates a new round by throwing
|
|
|
|
|
* [[akka.contrib.pattern.ClusterSingletonManagerIsStuck]] and expecting
|
|
|
|
|
* restart with fresh state. This will also cause the singleton actor to be
|
|
|
|
|
* stopped. `maxTakeOverRetries` must be less than `maxHandOverRetries` to
|
|
|
|
|
* ensure that new leader doesn't start singleton actor before previous is
|
|
|
|
|
* stopped for certain corner cases.
|
|
|
|
|
*
|
|
|
|
|
* '''''loggingEnabled''''' Logging of what is going on at info log level.
|
|
|
|
|
*/
|
|
|
|
|
class ClusterSingletonManager(
|
|
|
|
|
singletonProps: Option[Any] ⇒ Props,
|
|
|
|
|
singletonName: String,
|
|
|
|
|
terminationMessage: Any,
|
|
|
|
|
maxHandOverRetries: Int = 20,
|
|
|
|
|
maxTakeOverRetries: Int = 15,
|
|
|
|
|
retryInterval: FiniteDuration = 1.second,
|
|
|
|
|
loggingEnabled: Boolean = true)
|
|
|
|
|
extends Actor with FSM[ClusterSingletonManager.State, ClusterSingletonManager.Data] {
|
|
|
|
|
|
|
|
|
|
// to ensure that new leader doesn't start singleton actor before previous is stopped for certain corner cases
|
|
|
|
|
require(maxTakeOverRetries < maxHandOverRetries,
|
|
|
|
|
s"maxTakeOverRetries [${maxTakeOverRetries}]must be < maxHandOverRetries [${maxHandOverRetries}]")
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Full Java API constructor.
|
|
|
|
|
*/
|
|
|
|
|
def this(
|
|
|
|
|
singletonName: String,
|
|
|
|
|
terminationMessage: Any,
|
|
|
|
|
maxHandOverRetries: Int,
|
|
|
|
|
maxTakeOverRetries: Int,
|
|
|
|
|
retryInterval: FiniteDuration,
|
|
|
|
|
loggingEnabled: Boolean,
|
|
|
|
|
singletonPropsFactory: ClusterSingletonPropsFactory) =
|
|
|
|
|
this(handOverData ⇒ singletonPropsFactory.create(handOverData.orNull), singletonName, terminationMessage,
|
|
|
|
|
maxHandOverRetries, maxTakeOverRetries, retryInterval)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Java API constructor with default values.
|
|
|
|
|
*/
|
|
|
|
|
def this(
|
|
|
|
|
singletonName: String,
|
|
|
|
|
terminationMessage: Any,
|
|
|
|
|
singletonPropsFactory: ClusterSingletonPropsFactory) =
|
|
|
|
|
this(handOverData ⇒ singletonPropsFactory.create(handOverData.orNull), singletonName, terminationMessage)
|
|
|
|
|
|
|
|
|
|
import ClusterSingletonManager._
|
|
|
|
|
import ClusterSingletonManager.Internal._
|
|
|
|
|
import ClusterSingletonManager.Internal.LeaderChangedBuffer._
|
|
|
|
|
|
|
|
|
|
val cluster = Cluster(context.system)
|
|
|
|
|
val selfAddressOption = Some(cluster.selfAddress)
|
|
|
|
|
// started when when self member is Up
|
|
|
|
|
var leaderChangedBuffer: ActorRef = _
|
|
|
|
|
// Previous GetNext request delivered event and new GetNext is to be sent
|
|
|
|
|
var leaderChangedReceived = true
|
|
|
|
|
|
|
|
|
|
// keep track of previously downed members
|
|
|
|
|
var downed = Map.empty[Address, Deadline]
|
|
|
|
|
// keep track of previously removed members
|
|
|
|
|
var removed = Map.empty[Address, Deadline]
|
|
|
|
|
|
|
|
|
|
def addDowned(address: Address): Unit =
|
|
|
|
|
downed += address -> (Deadline.now + 15.minutes)
|
|
|
|
|
|
|
|
|
|
def addRemoved(address: Address): Unit =
|
|
|
|
|
removed += address -> (Deadline.now + 15.minutes)
|
|
|
|
|
|
|
|
|
|
def cleanupOverdueNotMemberAnyMore(): Unit = {
|
|
|
|
|
downed = downed filter { case (address, deadline) ⇒ deadline.hasTimeLeft }
|
|
|
|
|
removed = removed filter { case (address, deadline) ⇒ deadline.hasTimeLeft }
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def logInfo(message: String): Unit =
|
|
|
|
|
if (loggingEnabled) log.info(message)
|
|
|
|
|
|
|
|
|
|
def logInfo(template: String, arg1: Any): Unit =
|
|
|
|
|
if (loggingEnabled) log.info(template, arg1)
|
|
|
|
|
|
|
|
|
|
def logInfo(template: String, arg1: Any, arg2: Any): Unit =
|
|
|
|
|
if (loggingEnabled) log.info(template, arg1, arg2)
|
|
|
|
|
|
|
|
|
|
override def preStart(): Unit = {
|
|
|
|
|
super.preStart()
|
|
|
|
|
require(!cluster.isTerminated, "Cluster node must not be terminated")
|
|
|
|
|
|
|
|
|
|
// subscribe to cluster changes, re-subscribe when restart
|
|
|
|
|
cluster.subscribe(self, classOf[MemberDowned])
|
|
|
|
|
cluster.subscribe(self, classOf[MemberRemoved])
|
|
|
|
|
|
|
|
|
|
setTimer(CleanupTimer, Cleanup, 1.minute, repeat = true)
|
|
|
|
|
|
|
|
|
|
// defer subscription to LeaderChanged to avoid some jitter when
|
|
|
|
|
// starting/joining several nodes at the same time
|
|
|
|
|
cluster.registerOnMemberUp(self ! StartLeaderChangedBuffer)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def postStop(): Unit = {
|
|
|
|
|
cancelTimer(CleanupTimer)
|
|
|
|
|
cluster.unsubscribe(self)
|
|
|
|
|
super.postStop()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def peer(at: Address): ActorRef = context.actorFor(self.path.toStringWithAddress(at))
|
|
|
|
|
|
|
|
|
|
def getNextLeaderChanged(): Unit =
|
|
|
|
|
if (leaderChangedReceived) {
|
|
|
|
|
leaderChangedReceived = false
|
|
|
|
|
leaderChangedBuffer ! GetNext
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
startWith(Start, Uninitialized)
|
|
|
|
|
|
|
|
|
|
when(Start) {
|
|
|
|
|
case Event(StartLeaderChangedBuffer, _) ⇒
|
|
|
|
|
leaderChangedBuffer = context.actorOf(Props[LeaderChangedBuffer].withDispatcher(context.props.dispatcher))
|
|
|
|
|
getNextLeaderChanged()
|
|
|
|
|
stay
|
|
|
|
|
|
|
|
|
|
case Event(InitialLeaderState(leaderOption, memberCount), _) ⇒
|
|
|
|
|
leaderChangedReceived = true
|
|
|
|
|
if (leaderOption == selfAddressOption && memberCount == 1)
|
|
|
|
|
// alone, leader immediately
|
|
|
|
|
gotoLeader(None)
|
|
|
|
|
else if (leaderOption == selfAddressOption)
|
|
|
|
|
goto(BecomingLeader) using BecomingLeaderData(None)
|
|
|
|
|
else
|
|
|
|
|
goto(NonLeader) using NonLeaderData(leaderOption)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
when(NonLeader) {
|
|
|
|
|
case Event(LeaderChanged(leaderOption), NonLeaderData(previousLeaderOption)) ⇒
|
|
|
|
|
leaderChangedReceived = true
|
|
|
|
|
if (leaderOption == selfAddressOption) {
|
|
|
|
|
logInfo("NonLeader observed LeaderChanged: [{} -> myself]", previousLeaderOption)
|
|
|
|
|
previousLeaderOption match {
|
|
|
|
|
case None ⇒ gotoLeader(None)
|
|
|
|
|
case Some(prev) if downed.contains(prev) ⇒ gotoLeader(None)
|
|
|
|
|
case Some(prev) ⇒
|
|
|
|
|
peer(prev) ! HandOverToMe
|
|
|
|
|
goto(BecomingLeader) using BecomingLeaderData(previousLeaderOption)
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
logInfo("NonLeader observed LeaderChanged: [{} -> {}]", previousLeaderOption, leaderOption)
|
|
|
|
|
getNextLeaderChanged()
|
|
|
|
|
stay using NonLeaderData(leaderOption)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
case Event(MemberDowned(m), NonLeaderData(Some(previousLeader))) if m.address == previousLeader ⇒
|
|
|
|
|
logInfo("Previous leader downed [{}]", m.address)
|
|
|
|
|
addDowned(m.address)
|
|
|
|
|
// transition when LeaderChanged
|
|
|
|
|
stay using NonLeaderData(None)
|
|
|
|
|
|
|
|
|
|
case Event(MemberRemoved(m), _) if m.address == cluster.selfAddress ⇒
|
|
|
|
|
logInfo("Self removed, stopping ClusterSingletonManager")
|
|
|
|
|
stop()
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
when(BecomingLeader) {
|
|
|
|
|
|
|
|
|
|
case Event(HandOverInProgress, _) ⇒
|
2013-01-28 08:47:52 +01:00
|
|
|
// confirmation that the hand-over process has started
|
|
|
|
|
logInfo("Hand-over in progress at [{}]", sender.path.address)
|
2013-01-14 14:09:53 +01:00
|
|
|
cancelTimer(HandOverRetryTimer)
|
|
|
|
|
stay
|
|
|
|
|
|
|
|
|
|
case Event(HandOverDone(handOverData), BecomingLeaderData(Some(previousLeader))) ⇒
|
|
|
|
|
if (sender.path.address == previousLeader)
|
|
|
|
|
gotoLeader(handOverData)
|
|
|
|
|
else {
|
|
|
|
|
logInfo("Ignoring HandOverDone in BecomingLeader from [{}]. Expected previous leader [{}]",
|
|
|
|
|
sender.path.address, previousLeader)
|
|
|
|
|
stay
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
case Event(MemberDowned(m), BecomingLeaderData(Some(previousLeader))) if m.address == previousLeader ⇒
|
|
|
|
|
logInfo("Previous leader [{}] downed", previousLeader)
|
|
|
|
|
addDowned(m.address)
|
|
|
|
|
gotoLeader(None)
|
|
|
|
|
|
|
|
|
|
case Event(TakeOverFromMe, BecomingLeaderData(None)) ⇒
|
|
|
|
|
sender ! HandOverToMe
|
|
|
|
|
stay using BecomingLeaderData(Some(sender.path.address))
|
|
|
|
|
|
|
|
|
|
case Event(TakeOverFromMe, BecomingLeaderData(Some(previousLeader))) ⇒
|
|
|
|
|
if (previousLeader == sender.path.address) sender ! HandOverToMe
|
|
|
|
|
else logInfo("Ignoring TakeOver request in BecomingLeader from [{}]. Expected previous leader [{}]",
|
|
|
|
|
sender.path.address, previousLeader)
|
|
|
|
|
stay
|
|
|
|
|
|
|
|
|
|
case Event(HandOverRetry(count), BecomingLeaderData(previousLeaderOption)) ⇒
|
|
|
|
|
if (count <= maxHandOverRetries) {
|
|
|
|
|
logInfo("Retry [{}], sending HandOverToMe to [{}]", count, previousLeaderOption)
|
|
|
|
|
previousLeaderOption foreach { peer(_) ! HandOverToMe }
|
|
|
|
|
setTimer(HandOverRetryTimer, HandOverRetry(count + 1), retryInterval, repeat = false)
|
|
|
|
|
} else if (previousLeaderOption.isEmpty) {
|
|
|
|
|
// can't send HandOverToMe, previousLeader unknown for new node (or restart)
|
|
|
|
|
// previous leader might be down or removed, so no TakeOverFromMe message is received
|
|
|
|
|
logInfo("Timeout in BecomingLeader. Previous leader unknown and no TakeOver request.")
|
|
|
|
|
gotoLeader(None)
|
|
|
|
|
} else
|
|
|
|
|
throw new ClusterSingletonManagerIsStuck(
|
|
|
|
|
s"Becoming singleton leader was stuck because previous leader [${previousLeaderOption}] is unresponsive")
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def gotoLeader(handOverData: Option[Any]): State = {
|
|
|
|
|
logInfo("Singleton manager [{}] starting singleton actor", cluster.selfAddress)
|
|
|
|
|
val singleton = context watch context.actorOf(singletonProps(handOverData), singletonName)
|
|
|
|
|
goto(Leader) using LeaderData(singleton)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
when(Leader) {
|
|
|
|
|
case Event(LeaderChanged(leaderOption), LeaderData(singleton, singletonTerminated, handOverData)) ⇒
|
|
|
|
|
leaderChangedReceived = true
|
|
|
|
|
logInfo("Leader observed LeaderChanged: [{} -> {}]", cluster.selfAddress, leaderOption)
|
|
|
|
|
leaderOption match {
|
|
|
|
|
case Some(a) if a == cluster.selfAddress ⇒
|
|
|
|
|
// already leader
|
|
|
|
|
stay
|
|
|
|
|
case Some(a) if downed.contains(a) || removed.contains(a) ⇒
|
|
|
|
|
gotoHandingOver(singleton, singletonTerminated, handOverData, None)
|
|
|
|
|
case Some(a) ⇒
|
|
|
|
|
// send TakeOver request in case the new leader doesn't know previous leader
|
|
|
|
|
val leaderPeer = peer(a)
|
|
|
|
|
leaderPeer ! TakeOverFromMe
|
|
|
|
|
setTimer(TakeOverRetryTimer, TakeOverRetry(leaderPeer, 1), retryInterval, repeat = false)
|
|
|
|
|
goto(WasLeader) using WasLeaderData(singleton, singletonTerminated, handOverData, newLeader = a)
|
|
|
|
|
case _ ⇒
|
2013-01-28 08:47:52 +01:00
|
|
|
// new leader will initiate the hand-over
|
2013-01-14 14:09:53 +01:00
|
|
|
stay
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
case Event(HandOverToMe, LeaderData(singleton, singletonTerminated, handOverData)) ⇒
|
|
|
|
|
gotoHandingOver(singleton, singletonTerminated, handOverData, Some(sender))
|
|
|
|
|
|
|
|
|
|
case Event(singletonHandOverMessage, d @ LeaderData(singleton, _, _)) if sender == singleton ⇒
|
|
|
|
|
stay using d.copy(handOverData = Some(singletonHandOverMessage))
|
|
|
|
|
|
|
|
|
|
case Event(Terminated(ref), d @ LeaderData(singleton, _, _)) if ref == singleton ⇒
|
|
|
|
|
stay using d.copy(singletonTerminated = true)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
when(WasLeader) {
|
|
|
|
|
case Event(TakeOverRetry(leaderPeer, count), _) ⇒
|
|
|
|
|
val newLeader = leaderPeer.path.address
|
|
|
|
|
if (count <= maxTakeOverRetries) {
|
|
|
|
|
logInfo("Retry [{}], sending TakeOverFromMe to [{}]", count, newLeader)
|
|
|
|
|
leaderPeer ! TakeOverFromMe
|
|
|
|
|
setTimer(TakeOverRetryTimer, TakeOverRetry(leaderPeer, count + 1), retryInterval, repeat = false)
|
|
|
|
|
stay
|
|
|
|
|
} else
|
2013-01-28 08:47:52 +01:00
|
|
|
throw new ClusterSingletonManagerIsStuck(s"Expected hand-over to [${newLeader}] never occured")
|
2013-01-14 14:09:53 +01:00
|
|
|
|
|
|
|
|
case Event(HandOverToMe, WasLeaderData(singleton, singletonTerminated, handOverData, _)) ⇒
|
|
|
|
|
gotoHandingOver(singleton, singletonTerminated, handOverData, Some(sender))
|
|
|
|
|
|
|
|
|
|
case Event(MemberDowned(m), WasLeaderData(singleton, singletonTerminated, handOverData, newLeader)) if m.address == newLeader ⇒
|
|
|
|
|
addDowned(m.address)
|
|
|
|
|
gotoHandingOver(singleton, singletonTerminated, handOverData, None)
|
|
|
|
|
|
|
|
|
|
case Event(singletonHandOverMessage, d @ WasLeaderData(singleton, _, _, _)) if sender == singleton ⇒
|
|
|
|
|
stay using d.copy(handOverData = Some(singletonHandOverMessage))
|
|
|
|
|
|
|
|
|
|
case Event(Terminated(ref), d @ WasLeaderData(singleton, _, _, _)) if ref == singleton ⇒
|
|
|
|
|
stay using d.copy(singletonTerminated = true)
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def gotoHandingOver(singleton: ActorRef, singletonTerminated: Boolean, handOverData: Option[Any], handOverTo: Option[ActorRef]): State = {
|
|
|
|
|
if (singletonTerminated) {
|
|
|
|
|
handOverDone(handOverTo, handOverData)
|
|
|
|
|
} else {
|
|
|
|
|
handOverTo foreach { _ ! HandOverInProgress }
|
|
|
|
|
singleton ! terminationMessage
|
|
|
|
|
goto(HandingOver) using HandingOverData(singleton, handOverTo, handOverData)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
when(HandingOver) {
|
|
|
|
|
case (Event(Terminated(ref), HandingOverData(singleton, handOverTo, handOverData))) if ref == singleton ⇒
|
|
|
|
|
handOverDone(handOverTo, handOverData)
|
|
|
|
|
|
|
|
|
|
case Event(HandOverToMe, d @ HandingOverData(singleton, handOverTo, _)) if handOverTo == Some(sender) ⇒
|
|
|
|
|
// retry
|
|
|
|
|
sender ! HandOverInProgress
|
|
|
|
|
stay
|
|
|
|
|
|
|
|
|
|
case Event(singletonHandOverMessage, d @ HandingOverData(singleton, _, _)) if sender == singleton ⇒
|
|
|
|
|
stay using d.copy(handOverData = Some(singletonHandOverMessage))
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def handOverDone(handOverTo: Option[ActorRef], handOverData: Option[Any]): State = {
|
|
|
|
|
val newLeader = handOverTo.map(_.path.address)
|
2013-01-28 08:47:52 +01:00
|
|
|
logInfo("Singleton terminated, hand-over done [{} -> {}]", cluster.selfAddress, newLeader)
|
2013-01-14 14:09:53 +01:00
|
|
|
handOverTo foreach { _ ! HandOverDone(handOverData) }
|
|
|
|
|
goto(NonLeader) using NonLeaderData(newLeader)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
whenUnhandled {
|
|
|
|
|
case Event(_: CurrentClusterState, _) ⇒ stay
|
|
|
|
|
case Event(MemberRemoved(m), _) ⇒
|
|
|
|
|
logInfo("Member removed [{}]", m.address)
|
|
|
|
|
// if self removed, it will be stopped onTranstion to NonLeader
|
|
|
|
|
addRemoved(m.address)
|
|
|
|
|
stay
|
|
|
|
|
case Event(MemberDowned(m), _) ⇒
|
|
|
|
|
logInfo("Member downed [{}]", m.address)
|
|
|
|
|
addDowned(m.address)
|
|
|
|
|
stay
|
|
|
|
|
case Event(TakeOverFromMe, _) ⇒
|
|
|
|
|
logInfo("Ignoring TakeOver request in [{}] from [{}].", stateName, sender.path.address)
|
|
|
|
|
stay
|
|
|
|
|
case Event(Cleanup, _) ⇒
|
|
|
|
|
cleanupOverdueNotMemberAnyMore()
|
|
|
|
|
stay
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
onTransition {
|
|
|
|
|
case from -> to ⇒ logInfo("ClusterSingletonManager state change [{} -> {}]", from, to)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
onTransition {
|
|
|
|
|
case _ -> BecomingLeader ⇒ setTimer(HandOverRetryTimer, HandOverRetry(1), retryInterval, repeat = false)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
onTransition {
|
|
|
|
|
case BecomingLeader -> _ ⇒ cancelTimer(HandOverRetryTimer)
|
|
|
|
|
case WasLeader -> _ ⇒ cancelTimer(TakeOverRetryTimer)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
onTransition {
|
|
|
|
|
case _ -> (NonLeader | Leader) ⇒ getNextLeaderChanged()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
onTransition {
|
|
|
|
|
case _ -> NonLeader if removed.contains(cluster.selfAddress) || downed.contains(cluster.selfAddress) ⇒
|
|
|
|
|
logInfo("Self removed, stopping ClusterSingletonManager")
|
|
|
|
|
stop()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|